diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index ac2c1f1c..57a2d83f 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -824,18 +824,36 @@ def update_primary(new_primary, stat, event=''): new_primary = new_primary.decode('ascii') except AttributeError: new_primary = 'none' + key_version = stat.version if new_primary != this_node.primary_node: if config['daemon_mode'] == 'coordinator': # We're a coordinator and there is no primary if new_primary == 'none': - if this_node.daemon_state == 'run' and this_node.router_state != 'primary': + if this_node.daemon_state == 'run' and this_node.router_state == 'secondary': logger.out('Contending for primary coordinator state', state='i') - zkhandler.writedata(zk_conn, {'/primary_node': myhostname}) + # Acquire an exclusive lock on the primary_node key + primary_lock = zkhandler.exclusivelock(zk_conn, '/primary_node') + try: + # This lock times out after 0.4s, which is 0.1s less than the pre-takeover + # timeout below, thus ensuring that a primary takeover will not deadlock + # against a node that failed the contention + primary_lock.acquire(timeout=0.4) + # Ensure when we get the lock that the versions are still consistent and that + # another node hasn't already acquired primary state + if key_version == zk_conn.get('/primary_node')[1].version: + zkhandler.writedata(zk_conn, {'/primary_node': myhostname}) + # Cleanly release the lock + primary_lock.release() + # We timed out acquiring a lock, which means we failed contention, so just pass + except kazoo.exceptions.LockTimeout: + pass elif new_primary == myhostname: + time.sleep(0.5) zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'takeover'}) else: - if this_node.router_state != 'secondary': + if this_node.router_state == 'primary': + time.sleep(0.5) zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'relinquish'}) else: zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'client'}) diff --git a/node-daemon/pvcnoded/NodeInstance.py b/node-daemon/pvcnoded/NodeInstance.py index 04be43e9..b71dc145 100644 --- a/node-daemon/pvcnoded/NodeInstance.py +++ b/node-daemon/pvcnoded/NodeInstance.py @@ -317,7 +317,7 @@ class NodeInstance(object): Acquire primary coordinator status from a peer node """ # Lock the primary node until transition is complete - primary_lock = zkhandler.writelock(self.zk_conn, '/primary_node') + primary_lock = zkhandler.exclusivelock(self.zk_conn, '/primary_node') primary_lock.acquire() # Ensure our lock key is populated diff --git a/node-daemon/pvcnoded/zkhandler.py b/node-daemon/pvcnoded/zkhandler.py index 94c6e293..138c9e15 100644 --- a/node-daemon/pvcnoded/zkhandler.py +++ b/node-daemon/pvcnoded/zkhandler.py @@ -130,3 +130,9 @@ def readlock(zk_conn, key): lock_id = str(uuid.uuid1()) lock = zk_conn.ReadLock('{}'.format(key), lock_id) return lock + +# Exclusive lock function +def exclusivelock(zk_conn, key): + lock_id = str(uuid.uuid1()) + lock = zk_conn.Lock('{}'.format(key), lock_id) + return lock