diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index 07e59068..1ccdf0e6 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -489,7 +489,7 @@ def zk_listener(state): logger.out('Connection to Zookeeper lost; retrying', state='w') while True: - time.sleep(0.5) + time.sleep(1) _zk_conn = kazoo.client.KazooClient(hosts=config['coordinators']) try: @@ -509,7 +509,7 @@ zk_conn.add_listener(zk_listener) # Cleanup function def cleanup(): - global zk_conn, update_timer, d_domains + global zk_conn, update_timer, d_domain logger.out('Performing final keepalive update', state='s') update_zookeeper() @@ -830,6 +830,12 @@ if enable_networking: d_node[node].update_network_list(d_network) if enable_hypervisor: + # VM command pipeline key + @zk_conn.DataWatch('/cmd/domains') + def cmd(data, stat, event=''): + if data: + VMInstance.run_command(zk_conn, logger, this_node, data.decode('ascii')) + # VM domain objects @zk_conn.ChildrenWatch('/domains') def update_domains(new_domain_list): @@ -855,7 +861,7 @@ if enable_hypervisor: d_node[node].update_domain_list(d_domain) if enable_storage: - # Ceph OSD provisioning key + # Ceph command pipeline key @zk_conn.DataWatch('/ceph/cmd') def cmd(data, stat, event=''): if data: diff --git a/node-daemon/pvcd/VMInstance.py b/node-daemon/pvcd/VMInstance.py index 6b385165..887412d1 100644 --- a/node-daemon/pvcd/VMInstance.py +++ b/node-daemon/pvcd/VMInstance.py @@ -64,6 +64,31 @@ def flush_locks(zk_conn, logger, dom_uuid): continue logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock, rbd), state='o') +# Primary command function +def run_command(zk_conn, logger, this_node, data): + # Get the command and args + command, args = data.split() + + # Flushing VM RBD locks + if command == 'flush_locks': + dom_uuid = args + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains') + with zk_lock: + # Add the OSD + result = flush_locks(zk_conn, logger, dom_uuid) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/cmd/domains': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/cmd/domains': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + class VMInstance(object): # Initialization function def __init__(self, domuuid, zk_conn, config, logger, this_node):