Add locking in common zkhander
Ensures that every changed made here is locked, thus preventing duplicate updates, etc.
This commit is contained in:
		| @@ -35,15 +35,33 @@ def exists(zk_conn, key): | |||||||
|  |  | ||||||
| # Child list function | # Child list function | ||||||
| def listchildren(zk_conn, key): | def listchildren(zk_conn, key): | ||||||
|     children = zk_conn.get_children(key) |     try: | ||||||
|     return children |         children = zk_conn.get_children(key) | ||||||
|  |         return children | ||||||
|  |     except: | ||||||
|  |         return None | ||||||
|  |  | ||||||
| # Delete key function | # Key deletion function | ||||||
| def deletekey(zk_conn, key, recursive=True): | def deletekey(zk_conn, key, recursive=True): | ||||||
|     zk_conn.delete(key, recursive=recursive) |     lock = exclusivelock(zk_conn, key) | ||||||
|  |     lock.acquire() | ||||||
|  |  | ||||||
|  |     try: | ||||||
|  |         zk_conn.delete(key, recursive=recursive) | ||||||
|  |         lock.release() | ||||||
|  |         return True | ||||||
|  |     except: | ||||||
|  |         lock.release() | ||||||
|  |         return False | ||||||
|  |  | ||||||
| # Rename key recursive function | # Rename key recursive function | ||||||
| def rename_key_element(zk_conn, zk_transaction, source_key, destination_key): | def rename_key_element(zk_conn, zk_transaction, source_key, destination_key): | ||||||
|  |     lock_source = exclusivelock(zk_conn, source_key) | ||||||
|  |     lock_source.acquire() | ||||||
|  |  | ||||||
|  |     lock_destination = exclusivelock(zk_conn, destination_key) | ||||||
|  |     lock_destination.acquire() | ||||||
|  |  | ||||||
|     data_raw = zk_conn.get(source_key) |     data_raw = zk_conn.get(source_key) | ||||||
|     data = data_raw[0] |     data = data_raw[0] | ||||||
|     zk_transaction.create(destination_key, data) |     zk_transaction.create(destination_key, data) | ||||||
| @@ -56,6 +74,9 @@ def rename_key_element(zk_conn, zk_transaction, source_key, destination_key): | |||||||
|  |  | ||||||
|     zk_transaction.delete(source_key) |     zk_transaction.delete(source_key) | ||||||
|  |  | ||||||
|  |     lock_source.release() | ||||||
|  |     lock_destination.release() | ||||||
|  |  | ||||||
| # Rename key function | # Rename key function | ||||||
| def renamekey(zk_conn, kv): | def renamekey(zk_conn, kv): | ||||||
|     # Start up a transaction |     # Start up a transaction | ||||||
| @@ -83,10 +104,18 @@ def renamekey(zk_conn, kv): | |||||||
|  |  | ||||||
| # Data read function | # Data read function | ||||||
| def readdata(zk_conn, key): | def readdata(zk_conn, key): | ||||||
|     data_raw = zk_conn.get(key) |     lock = readlock(zk_conn, key) | ||||||
|     data = data_raw[0].decode('utf8') |     lock.acquire() | ||||||
|     meta = data_raw[1] |  | ||||||
|     return data |     try: | ||||||
|  |         data_raw = zk_conn.get(key) | ||||||
|  |         data = data_raw[0].decode('utf8') | ||||||
|  |         meta = data_raw[1] | ||||||
|  |  | ||||||
|  |         lock.release() | ||||||
|  |         return data | ||||||
|  |     except: | ||||||
|  |         return False | ||||||
|  |  | ||||||
| # Data write function | # Data write function | ||||||
| def writedata(zk_conn, kv): | def writedata(zk_conn, kv): | ||||||
| @@ -95,6 +124,9 @@ def writedata(zk_conn, kv): | |||||||
|  |  | ||||||
|     # Proceed one KV pair at a time |     # Proceed one KV pair at a time | ||||||
|     for key in sorted(kv): |     for key in sorted(kv): | ||||||
|  |         lock = writelock(zk_conn, key) | ||||||
|  |         lock.acquire() | ||||||
|  |  | ||||||
|         data = kv[key] |         data = kv[key] | ||||||
|  |  | ||||||
|         # Check if this key already exists or not |         # Check if this key already exists or not | ||||||
| @@ -117,7 +149,9 @@ def writedata(zk_conn, kv): | |||||||
|                 zk_transaction.check(key, new_version) |                 zk_transaction.check(key, new_version) | ||||||
|             except TypeError: |             except TypeError: | ||||||
|                 print('Zookeeper key "{}" does not match expected version'.format(key)) |                 print('Zookeeper key "{}" does not match expected version'.format(key)) | ||||||
|  |                 lock.release() | ||||||
|                 return False |                 return False | ||||||
|  |         lock.release() | ||||||
|  |  | ||||||
|     # Commit the transaction |     # Commit the transaction | ||||||
|     try: |     try: | ||||||
| @@ -128,12 +162,48 @@ def writedata(zk_conn, kv): | |||||||
|  |  | ||||||
| # Write lock function | # Write lock function | ||||||
| def writelock(zk_conn, key): | def writelock(zk_conn, key): | ||||||
|     lock_id = str(uuid.uuid1()) |     count = 1 | ||||||
|     lock = zk_conn.WriteLock('{}'.format(key), lock_id) |     while True: | ||||||
|  |         try: | ||||||
|  |             lock_id = str(uuid.uuid1()) | ||||||
|  |             lock = zk_conn.WriteLock('{}'.format(key), lock_id) | ||||||
|  |             break | ||||||
|  |         except Exception: | ||||||
|  |             count += 1 | ||||||
|  |             if count > 5: | ||||||
|  |                 break | ||||||
|  |             else: | ||||||
|  |                 continue | ||||||
|     return lock |     return lock | ||||||
|  |  | ||||||
| # Read lock function | # Read lock function | ||||||
| def readlock(zk_conn, key): | def readlock(zk_conn, key): | ||||||
|     lock_id = str(uuid.uuid1()) |     count = 1 | ||||||
|     lock = zk_conn.ReadLock('{}'.format(key), lock_id) |     while True: | ||||||
|  |         try: | ||||||
|  |             lock_id = str(uuid.uuid1()) | ||||||
|  |             lock = zk_conn.ReadLock('{}'.format(key), lock_id) | ||||||
|  |             break | ||||||
|  |         except Exception: | ||||||
|  |             count += 1 | ||||||
|  |             if count > 5: | ||||||
|  |                 break | ||||||
|  |             else: | ||||||
|  |                 continue | ||||||
|  |     return lock | ||||||
|  |  | ||||||
|  | # Exclusive lock function | ||||||
|  | def exclusivelock(zk_conn, key): | ||||||
|  |     count = 1 | ||||||
|  |     while True: | ||||||
|  |         try: | ||||||
|  |             lock_id = str(uuid.uuid1()) | ||||||
|  |             lock = zk_conn.Lock('{}'.format(key), lock_id) | ||||||
|  |             break | ||||||
|  |         except Exception: | ||||||
|  |             count += 1 | ||||||
|  |             if count > 5: | ||||||
|  |                 break | ||||||
|  |             else: | ||||||
|  |                 continue | ||||||
|     return lock |     return lock | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user