Convert CephInstance to new ZK schema handler
This commit is contained in:
parent
f913f42a6d
commit
610f6e8f2c
|
@ -35,7 +35,7 @@ class CephOSDInstance(object):
|
|||
self.size = None
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id))
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id))
|
||||
def watch_osd_node(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
|
@ -50,7 +50,7 @@ class CephOSDInstance(object):
|
|||
if data and data != self.node:
|
||||
self.node = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id))
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id))
|
||||
def watch_osd_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
|
@ -174,10 +174,10 @@ def add_osd(zkhandler, logger, node, device, weight):
|
|||
# 7. Add the new OSD to the list
|
||||
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
|
||||
zkhandler.write([
|
||||
('/ceph/osds/{}'.format(osd_id), ''),
|
||||
('/ceph/osds/{}/node'.format(osd_id), node),
|
||||
('/ceph/osds/{}/device'.format(osd_id), device),
|
||||
('/ceph/osds/{}/stats'.format(osd_id), '{}')
|
||||
(('osd', osd_id), ''),
|
||||
(('osd.node', osd_id), node),
|
||||
(('osd.device', osd_id), device),
|
||||
(('osd.stats', osd_id), '{}'),
|
||||
])
|
||||
|
||||
# Log it
|
||||
|
@ -272,7 +272,7 @@ def remove_osd(zkhandler, logger, osd_id, osd_obj):
|
|||
|
||||
# 7. Delete OSD from ZK
|
||||
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
|
||||
zkhandler.delete('/ceph/osds/{}'.format(osd_id), recursive=True)
|
||||
zkhandler.delete(('osd', osd_id), recursive=True)
|
||||
|
||||
# Log it
|
||||
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
|
||||
|
@ -291,7 +291,7 @@ class CephPoolInstance(object):
|
|||
self.pgs = ''
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name))
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name))
|
||||
def watch_pool_node(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
|
@ -306,7 +306,7 @@ class CephPoolInstance(object):
|
|||
if data and data != self.pgs:
|
||||
self.pgs = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name))
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name))
|
||||
def watch_pool_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
|
@ -330,7 +330,7 @@ class CephVolumeInstance(object):
|
|||
self.name = name
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name))
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}'))
|
||||
def watch_volume_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
|
@ -355,7 +355,7 @@ class CephSnapshotInstance(object):
|
|||
self.name = name
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name))
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapsho.stats', f'{self.pool}/{self.volume}/{self.name}'))
|
||||
def watch_snapshot_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
|
@ -382,7 +382,7 @@ def run_command(zkhandler, logger, this_node, data, d_osd):
|
|||
node, device, weight = args.split(',')
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('/cmd/ceph')
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
with zk_lock:
|
||||
# Add the OSD
|
||||
result = add_osd(zkhandler, logger, node, device, weight)
|
||||
|
@ -390,13 +390,13 @@ def run_command(zkhandler, logger, this_node, data, d_osd):
|
|||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('/cmd/ceph', 'success-{}'.format(data))
|
||||
('base.cmd.ceph', 'success-{}'.format(data))
|
||||
])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('/cmd/ceph', 'failure-{}'.format(data))
|
||||
('base.cmd.ceph', 'failure-{}'.format(data))
|
||||
])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
@ -408,7 +408,7 @@ def run_command(zkhandler, logger, this_node, data, d_osd):
|
|||
# Verify osd_id is in the list
|
||||
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('/cmd/ceph')
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
with zk_lock:
|
||||
# Remove the OSD
|
||||
result = remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
|
||||
|
@ -416,13 +416,13 @@ def run_command(zkhandler, logger, this_node, data, d_osd):
|
|||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('/cmd/ceph', 'success-{}'.format(data))
|
||||
('base.cmd.ceph', 'success-{}'.format(data))
|
||||
])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('/cmd/ceph', 'failure-{}'.format(data))
|
||||
('base.cmd.ceph', 'failure-{}'.format(data))
|
||||
])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
|
Loading…
Reference in New Issue