Add pipeline for VM lock flush cmd
This commit is contained in:
parent
d63d9637a5
commit
a2a630f6a0
|
@ -489,7 +489,7 @@ def zk_listener(state):
|
||||||
logger.out('Connection to Zookeeper lost; retrying', state='w')
|
logger.out('Connection to Zookeeper lost; retrying', state='w')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
time.sleep(0.5)
|
time.sleep(1)
|
||||||
|
|
||||||
_zk_conn = kazoo.client.KazooClient(hosts=config['coordinators'])
|
_zk_conn = kazoo.client.KazooClient(hosts=config['coordinators'])
|
||||||
try:
|
try:
|
||||||
|
@ -509,7 +509,7 @@ zk_conn.add_listener(zk_listener)
|
||||||
|
|
||||||
# Cleanup function
|
# Cleanup function
|
||||||
def cleanup():
|
def cleanup():
|
||||||
global zk_conn, update_timer, d_domains
|
global zk_conn, update_timer, d_domain
|
||||||
|
|
||||||
logger.out('Performing final keepalive update', state='s')
|
logger.out('Performing final keepalive update', state='s')
|
||||||
update_zookeeper()
|
update_zookeeper()
|
||||||
|
@ -830,6 +830,12 @@ if enable_networking:
|
||||||
d_node[node].update_network_list(d_network)
|
d_node[node].update_network_list(d_network)
|
||||||
|
|
||||||
if enable_hypervisor:
|
if enable_hypervisor:
|
||||||
|
# VM command pipeline key
|
||||||
|
@zk_conn.DataWatch('/cmd/domains')
|
||||||
|
def cmd(data, stat, event=''):
|
||||||
|
if data:
|
||||||
|
VMInstance.run_command(zk_conn, logger, this_node, data.decode('ascii'))
|
||||||
|
|
||||||
# VM domain objects
|
# VM domain objects
|
||||||
@zk_conn.ChildrenWatch('/domains')
|
@zk_conn.ChildrenWatch('/domains')
|
||||||
def update_domains(new_domain_list):
|
def update_domains(new_domain_list):
|
||||||
|
@ -855,7 +861,7 @@ if enable_hypervisor:
|
||||||
d_node[node].update_domain_list(d_domain)
|
d_node[node].update_domain_list(d_domain)
|
||||||
|
|
||||||
if enable_storage:
|
if enable_storage:
|
||||||
# Ceph OSD provisioning key
|
# Ceph command pipeline key
|
||||||
@zk_conn.DataWatch('/ceph/cmd')
|
@zk_conn.DataWatch('/ceph/cmd')
|
||||||
def cmd(data, stat, event=''):
|
def cmd(data, stat, event=''):
|
||||||
if data:
|
if data:
|
||||||
|
|
|
@ -64,6 +64,31 @@ def flush_locks(zk_conn, logger, dom_uuid):
|
||||||
continue
|
continue
|
||||||
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock, rbd), state='o')
|
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock, rbd), state='o')
|
||||||
|
|
||||||
|
# Primary command function
|
||||||
|
def run_command(zk_conn, logger, this_node, data):
|
||||||
|
# Get the command and args
|
||||||
|
command, args = data.split()
|
||||||
|
|
||||||
|
# Flushing VM RBD locks
|
||||||
|
if command == 'flush_locks':
|
||||||
|
dom_uuid = args
|
||||||
|
if this_node.router_state == 'primary':
|
||||||
|
# Lock the command queue
|
||||||
|
zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains')
|
||||||
|
with zk_lock:
|
||||||
|
# Add the OSD
|
||||||
|
result = flush_locks(zk_conn, logger, dom_uuid)
|
||||||
|
# Command succeeded
|
||||||
|
if result:
|
||||||
|
# Update the command queue
|
||||||
|
zkhandler.writedata(zk_conn, {'/cmd/domains': 'success-{}'.format(data)})
|
||||||
|
# Command failed
|
||||||
|
else:
|
||||||
|
# Update the command queue
|
||||||
|
zkhandler.writedata(zk_conn, {'/cmd/domains': 'failure-{}'.format(data)})
|
||||||
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
class VMInstance(object):
|
class VMInstance(object):
|
||||||
# Initialization function
|
# Initialization function
|
||||||
def __init__(self, domuuid, zk_conn, config, logger, this_node):
|
def __init__(self, domuuid, zk_conn, config, logger, this_node):
|
||||||
|
|
Loading…
Reference in New Issue