diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 1649c3b9..1c853c2c 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -35,15 +35,33 @@ def exists(zk_conn, key): # Child list function def listchildren(zk_conn, key): - children = zk_conn.get_children(key) - return children + try: + children = zk_conn.get_children(key) + return children + except: + return None -# Delete key function +# Key deletion function def deletekey(zk_conn, key, recursive=True): - zk_conn.delete(key, recursive=recursive) + lock = exclusivelock(zk_conn, key) + lock.acquire() + + try: + zk_conn.delete(key, recursive=recursive) + lock.release() + return True + except: + lock.release() + return False # Rename key recursive function def rename_key_element(zk_conn, zk_transaction, source_key, destination_key): + lock_source = exclusivelock(zk_conn, source_key) + lock_source.acquire() + + lock_destination = exclusivelock(zk_conn, destination_key) + lock_destination.acquire() + data_raw = zk_conn.get(source_key) data = data_raw[0] zk_transaction.create(destination_key, data) @@ -56,6 +74,9 @@ def rename_key_element(zk_conn, zk_transaction, source_key, destination_key): zk_transaction.delete(source_key) + lock_source.release() + lock_destination.release() + # Rename key function def renamekey(zk_conn, kv): # Start up a transaction @@ -83,10 +104,18 @@ def renamekey(zk_conn, kv): # Data read function def readdata(zk_conn, key): - data_raw = zk_conn.get(key) - data = data_raw[0].decode('utf8') - meta = data_raw[1] - return data + lock = readlock(zk_conn, key) + lock.acquire() + + try: + data_raw = zk_conn.get(key) + data = data_raw[0].decode('utf8') + meta = data_raw[1] + + lock.release() + return data + except: + return False # Data write function def writedata(zk_conn, kv): @@ -95,6 +124,9 @@ def writedata(zk_conn, kv): # Proceed one KV pair at a time for key in sorted(kv): + lock = writelock(zk_conn, key) + lock.acquire() + data = kv[key] # Check if this key already exists or not @@ -117,7 +149,9 @@ def writedata(zk_conn, kv): zk_transaction.check(key, new_version) except TypeError: print('Zookeeper key "{}" does not match expected version'.format(key)) + lock.release() return False + lock.release() # Commit the transaction try: @@ -128,12 +162,48 @@ def writedata(zk_conn, kv): # Write lock function def writelock(zk_conn, key): - lock_id = str(uuid.uuid1()) - lock = zk_conn.WriteLock('{}'.format(key), lock_id) + count = 1 + while True: + try: + lock_id = str(uuid.uuid1()) + lock = zk_conn.WriteLock('{}'.format(key), lock_id) + break + except Exception: + count += 1 + if count > 5: + break + else: + continue return lock # Read lock function def readlock(zk_conn, key): - lock_id = str(uuid.uuid1()) - lock = zk_conn.ReadLock('{}'.format(key), lock_id) + count = 1 + while True: + try: + lock_id = str(uuid.uuid1()) + lock = zk_conn.ReadLock('{}'.format(key), lock_id) + break + except Exception: + count += 1 + if count > 5: + break + else: + continue + return lock + +# Exclusive lock function +def exclusivelock(zk_conn, key): + count = 1 + while True: + try: + lock_id = str(uuid.uuid1()) + lock = zk_conn.Lock('{}'.format(key), lock_id) + break + except Exception: + count += 1 + if count > 5: + break + else: + continue return lock