Convert CepnInstance to new zkhandler

This commit is contained in:
Joshua Boniface 2021-05-31 19:51:27 -04:00
parent f6d0e89568
commit ef5fe78125
2 changed files with 43 additions and 36 deletions

View File

@ -23,20 +23,19 @@ import time
import json import json
import psutil import psutil
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common import pvcnoded.common as common
class CephOSDInstance(object): class CephOSDInstance(object):
def __init__(self, zk_conn, this_node, osd_id): def __init__(self, zkhandler, this_node, osd_id):
self.zk_conn = zk_conn self.zkhandler = zkhandler
self.this_node = this_node self.this_node = this_node
self.osd_id = osd_id self.osd_id = osd_id
self.node = None self.node = None
self.size = None self.size = None
self.stats = dict() self.stats = dict()
@self.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id)) @self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id))
def watch_osd_node(data, stat, event=''): def watch_osd_node(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -51,7 +50,7 @@ class CephOSDInstance(object):
if data and data != self.node: if data and data != self.node:
self.node = data self.node = data
@self.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id)) @self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id))
def watch_osd_stats(data, stat, event=''): def watch_osd_stats(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -67,7 +66,7 @@ class CephOSDInstance(object):
self.stats = json.loads(data) self.stats = json.loads(data)
def add_osd(zk_conn, logger, node, device, weight): def add_osd(zkhandler, logger, node, device, weight):
# We are ready to create a new OSD on this node # We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i') logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
try: try:
@ -174,12 +173,12 @@ def add_osd(zk_conn, logger, node, device, weight):
# 7. Add the new OSD to the list # 7. Add the new OSD to the list
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i') logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
zkhandler.writedata(zk_conn, { zkhandler.write([
'/ceph/osds/{}'.format(osd_id): '', ('/ceph/osds/{}'.format(osd_id), ''),
'/ceph/osds/{}/node'.format(osd_id): node, ('/ceph/osds/{}/node'.format(osd_id), node),
'/ceph/osds/{}/device'.format(osd_id): device, ('/ceph/osds/{}/device'.format(osd_id), device),
'/ceph/osds/{}/stats'.format(osd_id): '{}' ('/ceph/osds/{}/stats'.format(osd_id), '{}')
}) ])
# Log it # Log it
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o') logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
@ -190,7 +189,7 @@ def add_osd(zk_conn, logger, node, device, weight):
return False return False
def remove_osd(zk_conn, logger, osd_id, osd_obj): def remove_osd(zkhandler, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i') logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try: try:
# 1. Verify the OSD is present # 1. Verify the OSD is present
@ -273,7 +272,7 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
# 7. Delete OSD from ZK # 7. Delete OSD from ZK
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i') logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
zkhandler.deletekey(zk_conn, '/ceph/osds/{}'.format(osd_id)) zkhandler.delete('/ceph/osds/{}'.format(osd_id), recursive=True)
# Log it # Log it
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o') logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
@ -285,14 +284,14 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
class CephPoolInstance(object): class CephPoolInstance(object):
def __init__(self, zk_conn, this_node, name): def __init__(self, zkhandler, this_node, name):
self.zk_conn = zk_conn self.zkhandler = zkhandler
self.this_node = this_node self.this_node = this_node
self.name = name self.name = name
self.pgs = '' self.pgs = ''
self.stats = dict() self.stats = dict()
@self.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name)) @self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name))
def watch_pool_node(data, stat, event=''): def watch_pool_node(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -307,7 +306,7 @@ class CephPoolInstance(object):
if data and data != self.pgs: if data and data != self.pgs:
self.pgs = data self.pgs = data
@self.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name)) @self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name))
def watch_pool_stats(data, stat, event=''): def watch_pool_stats(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -324,14 +323,14 @@ class CephPoolInstance(object):
class CephVolumeInstance(object): class CephVolumeInstance(object):
def __init__(self, zk_conn, this_node, pool, name): def __init__(self, zkhandler, this_node, pool, name):
self.zk_conn = zk_conn self.zkhandler = zkhandler
self.this_node = this_node self.this_node = this_node
self.pool = pool self.pool = pool
self.name = name self.name = name
self.stats = dict() self.stats = dict()
@self.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name)) @self.zkhandler.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name))
def watch_volume_stats(data, stat, event=''): def watch_volume_stats(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -348,15 +347,15 @@ class CephVolumeInstance(object):
class CephSnapshotInstance(object): class CephSnapshotInstance(object):
def __init__(self, zk_conn, this_node, pool, volume, name): def __init__(self, zkhandler, this_node, pool, volume, name):
self.zk_conn = zk_conn self.zkhandler = zkhandler
self.this_node = this_node self.this_node = this_node
self.pool = pool self.pool = pool
self.volume = volume self.volume = volume
self.name = name self.name = name
self.stats = dict() self.stats = dict()
@self.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name)) @self.zkhandler.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name))
def watch_snapshot_stats(data, stat, event=''): def watch_snapshot_stats(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -374,7 +373,7 @@ class CephSnapshotInstance(object):
# Primary command function # Primary command function
# This command pipe is only used for OSD adds and removes # This command pipe is only used for OSD adds and removes
def run_command(zk_conn, logger, this_node, data, d_osd): def run_command(zkhandler, logger, this_node, data, d_osd):
# Get the command and args # Get the command and args
command, args = data.split() command, args = data.split()
@ -383,18 +382,22 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
node, device, weight = args.split(',') node, device, weight = args.split(',')
if node == this_node.name: if node == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') zk_lock = zkhandler.writelock('/cmd/ceph')
with zk_lock: with zk_lock:
# Add the OSD # Add the OSD
result = add_osd(zk_conn, logger, node, device, weight) result = add_osd(zkhandler, logger, node, device, weight)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) zkhandler.write([
('/cmd/ceph', 'success-{}'.format(data))
])
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) zkhandler.write([
('/cmd/ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -405,17 +408,21 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
# Verify osd_id is in the list # Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') zk_lock = zkhandler.writelock('/cmd/ceph')
with zk_lock: with zk_lock:
# Remove the OSD # Remove the OSD
result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) result = remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) zkhandler.write([
('/cmd/ceph', 'success-{}'.format(data))
])
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) zkhandler.write([
('/cmd/ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)

View File

@ -971,7 +971,7 @@ if enable_storage:
# Add any missing OSDs to the list # Add any missing OSDs to the list
for osd in new_osd_list: for osd in new_osd_list:
if osd not in osd_list: if osd not in osd_list:
d_osd[osd] = CephInstance.CephOSDInstance(zkhandler.zk_conn, this_node, osd) d_osd[osd] = CephInstance.CephOSDInstance(zkhandler, this_node, osd)
# Remove any deleted OSDs from the list # Remove any deleted OSDs from the list
for osd in osd_list: for osd in osd_list:
@ -991,7 +991,7 @@ if enable_storage:
# Add any missing Pools to the list # Add any missing Pools to the list
for pool in new_pool_list: for pool in new_pool_list:
if pool not in pool_list: if pool not in pool_list:
d_pool[pool] = CephInstance.CephPoolInstance(zkhandler.zk_conn, this_node, pool) d_pool[pool] = CephInstance.CephPoolInstance(zkhandler, this_node, pool)
d_volume[pool] = dict() d_volume[pool] = dict()
volume_list[pool] = [] volume_list[pool] = []
@ -1014,7 +1014,7 @@ if enable_storage:
# Add any missing Volumes to the list # Add any missing Volumes to the list
for volume in new_volume_list: for volume in new_volume_list:
if volume not in volume_list[pool]: if volume not in volume_list[pool]:
d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler.zk_conn, this_node, pool, volume) d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler, this_node, pool, volume)
# Remove any deleted Volumes from the list # Remove any deleted Volumes from the list
for volume in volume_list[pool]: for volume in volume_list[pool]: