Implement class-based version of zkhander
This commit is contained in:
parent
3128c8fa70
commit
fef230ad98
|
@ -21,164 +21,232 @@
|
|||
|
||||
import time
|
||||
import uuid
|
||||
from kazoo.client import KazooClient
|
||||
|
||||
|
||||
# Exists function
|
||||
def exists(zk_conn, key):
|
||||
stat = zk_conn.exists(key)
|
||||
if stat:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
class ZKHandler(object):
|
||||
def __init__(self, hosts):
|
||||
"""
|
||||
Initialize an instance of the ZKHandler class with config
|
||||
|
||||
A zk_conn object will be created but not started
|
||||
"""
|
||||
self.encoding = 'utf8'
|
||||
self.zk_conn = KazooClient(hosts=hosts)
|
||||
|
||||
# Child list function
|
||||
def listchildren(zk_conn, key):
|
||||
children = zk_conn.get_children(key)
|
||||
return children
|
||||
#
|
||||
# State/connection management
|
||||
#
|
||||
def connect(self):
|
||||
"""
|
||||
Start the zk_conn object and connect to the cluster
|
||||
"""
|
||||
self.zk_conn.start()
|
||||
|
||||
def disconnect(self):
|
||||
"""
|
||||
Stop and close the zk_conn object and disconnect from the cluster
|
||||
|
||||
# Delete key function
|
||||
def deletekey(zk_conn, key, recursive=True):
|
||||
zk_conn.delete(key, recursive=recursive)
|
||||
The class instance may be reused later (avoids persistent connections)
|
||||
"""
|
||||
self.zk_conn.stop()
|
||||
self.zk_conn.close()
|
||||
|
||||
|
||||
# Rename key recursive function
|
||||
def rename_key_element(zk_conn, zk_transaction, source_key, destination_key):
|
||||
data_raw = zk_conn.get(source_key)
|
||||
data = data_raw[0]
|
||||
zk_transaction.create(destination_key, data)
|
||||
|
||||
if zk_conn.get_children(source_key):
|
||||
for child_key in zk_conn.get_children(source_key):
|
||||
child_source_key = "{}/{}".format(source_key, child_key)
|
||||
child_destination_key = "{}/{}".format(destination_key, child_key)
|
||||
rename_key_element(zk_conn, zk_transaction, child_source_key, child_destination_key)
|
||||
|
||||
zk_transaction.delete(source_key)
|
||||
|
||||
|
||||
# Rename key function
|
||||
def renamekey(zk_conn, kv):
|
||||
# Start up a transaction
|
||||
zk_transaction = zk_conn.transaction()
|
||||
|
||||
# Proceed one KV pair at a time
|
||||
for source_key in sorted(kv):
|
||||
destination_key = kv[source_key]
|
||||
|
||||
# Check if the source key exists or fail out
|
||||
if not zk_conn.exists(source_key):
|
||||
raise
|
||||
# Check if the destination key exists and fail out
|
||||
if zk_conn.exists(destination_key):
|
||||
raise
|
||||
|
||||
rename_key_element(zk_conn, zk_transaction, source_key, destination_key)
|
||||
|
||||
# Commit the transaction
|
||||
try:
|
||||
zk_transaction.commit()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# Data read function
|
||||
def readdata(zk_conn, key):
|
||||
data_raw = zk_conn.get(key)
|
||||
data = data_raw[0].decode('utf8')
|
||||
return data
|
||||
|
||||
|
||||
# Data write function
|
||||
def writedata(zk_conn, kv):
|
||||
# Start up a transaction
|
||||
zk_transaction = zk_conn.transaction()
|
||||
|
||||
# Proceed one KV pair at a time
|
||||
for key in sorted(kv):
|
||||
data = kv[key]
|
||||
|
||||
# Check if this key already exists or not
|
||||
if not zk_conn.exists(key):
|
||||
# We're creating a new key
|
||||
zk_transaction.create(key, str(data).encode('utf8'))
|
||||
#
|
||||
# Key Actions
|
||||
#
|
||||
def exists(self, key):
|
||||
"""
|
||||
Check if a key exists
|
||||
"""
|
||||
stat = self.zk_conn.exists(key)
|
||||
if stat:
|
||||
return True
|
||||
else:
|
||||
# We're updating a key with version validation
|
||||
orig_data = zk_conn.get(key)
|
||||
version = orig_data[1].version
|
||||
return False
|
||||
|
||||
# Set what we expect the new version to be
|
||||
new_version = version + 1
|
||||
def read(self, key):
|
||||
"""
|
||||
Read data from a key
|
||||
"""
|
||||
return self.zk_conn.get(key)[0].decode(self.encoding)
|
||||
|
||||
# Update the data
|
||||
zk_transaction.set_data(key, str(data).encode('utf8'))
|
||||
def write(self, kvpairs):
|
||||
"""
|
||||
Create or update one or more keys' data
|
||||
"""
|
||||
if type(kvpairs) is not list:
|
||||
print("ZKHandler error: Key-value sequence is not a list")
|
||||
return False
|
||||
|
||||
# Set up the check
|
||||
try:
|
||||
zk_transaction.check(key, new_version)
|
||||
except TypeError:
|
||||
print('Zookeeper key "{}" does not match expected version'.format(key))
|
||||
transaction = self.zk_conn.transaction()
|
||||
|
||||
for kvpair in (kvpairs):
|
||||
if type(kvpair) is not tuple:
|
||||
print("ZKHandler error: Key-value pair '{}' is not a tuple".format(kvpair))
|
||||
return False
|
||||
|
||||
# Commit the transaction
|
||||
try:
|
||||
zk_transaction.commit()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
key = kvpair[0]
|
||||
value = kvpair[1]
|
||||
|
||||
if not self.exists(key):
|
||||
# Creating a new key
|
||||
transaction.create(key, str(value).encode(self.encoding))
|
||||
|
||||
# Write lock function
|
||||
def writelock(zk_conn, key):
|
||||
count = 1
|
||||
while True:
|
||||
try:
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.WriteLock('{}'.format(key), lock_id)
|
||||
break
|
||||
except Exception:
|
||||
count += 1
|
||||
if count > 5:
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
return lock
|
||||
# Updating an existing key
|
||||
data = self.zk_conn.get(key)
|
||||
version = data[1].version
|
||||
|
||||
# Validate the expected version after the execution
|
||||
new_version = version + 1
|
||||
|
||||
# Update the data
|
||||
transaction.set_data(key, str(value).encode(self.encoding))
|
||||
|
||||
# Check the data
|
||||
try:
|
||||
transaction.check(key, new_version)
|
||||
except TypeError:
|
||||
print("ZKHandler error: Key '{}' does not match expected version".format(key))
|
||||
return False
|
||||
|
||||
# Read lock function
|
||||
def readlock(zk_conn, key):
|
||||
count = 1
|
||||
while True:
|
||||
try:
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.ReadLock('{}'.format(key), lock_id)
|
||||
break
|
||||
except Exception:
|
||||
count += 1
|
||||
if count > 5:
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
return lock
|
||||
transaction.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
print("ZKHandler error: Failed to commit transaction: {}".format(e))
|
||||
return False
|
||||
|
||||
def delete(self, key, recursive=True):
|
||||
"""
|
||||
Delete a key (defaults to recursive)
|
||||
"""
|
||||
if self.zk_conn.delete(key, recursive=recursive):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def children(self, key):
|
||||
"""
|
||||
Lists all children of a key
|
||||
"""
|
||||
return self.zk_conn.get_children(key)
|
||||
|
||||
def rename(self, kkpairs):
|
||||
"""
|
||||
Rename one or more keys to a new value
|
||||
"""
|
||||
if type(kkpairs) is not list:
|
||||
print("ZKHandler error: Key-key sequence is not a list")
|
||||
return False
|
||||
|
||||
transaction = self.zk_conn.transaction()
|
||||
|
||||
def rename_element(transaction, source_key, destnation_key):
|
||||
data = self.zk_conn.get(source_key)[0]
|
||||
transaction.create(destination_key, data)
|
||||
|
||||
if self.children(source_key):
|
||||
for child_key in self.children(source_key):
|
||||
child_source_key = "{}/{}".format(source_key, child_key)
|
||||
child_destination_key = "{}/{}".format(destination_key, child_key)
|
||||
rename_element(transaction, child_source_key, child_destination_key)
|
||||
|
||||
transaction.delete(source_key, recursive=True)
|
||||
|
||||
for kkpair in (kkpairs):
|
||||
if type(kkpair) is not tuple:
|
||||
print("ZKHandler error: Key-key pair '{}' is not a tuple".format(kkpair))
|
||||
return False
|
||||
|
||||
source_key = kkpair[0]
|
||||
destination_key = kkpair[1]
|
||||
|
||||
if not self.exists(source_key):
|
||||
print("ZKHander error: Source key '{}' does not exist".format(source_key))
|
||||
return False
|
||||
if self.exists(destination_key):
|
||||
print("ZKHander error: Destination key '{}' already exists".format(destination_key))
|
||||
return False
|
||||
|
||||
rename_element(transaction, source_key, destination_key)
|
||||
|
||||
# Exclusive lock function
|
||||
def exclusivelock(zk_conn, key):
|
||||
count = 1
|
||||
while True:
|
||||
try:
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.Lock('{}'.format(key), lock_id)
|
||||
break
|
||||
except Exception:
|
||||
count += 1
|
||||
if count > 5:
|
||||
transaction.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
print("ZKHandler error: Failed to commit transaction: {}".format(e))
|
||||
return False
|
||||
|
||||
#
|
||||
# Lock actions
|
||||
#
|
||||
def readlock(self, key):
|
||||
"""
|
||||
Acquires a read lock on a key
|
||||
"""
|
||||
count = 1
|
||||
lock = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = self.zk_conn.ReadLock(key, lock_id)
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
return lock
|
||||
except Exception as e:
|
||||
if count > 5:
|
||||
print("ZKHandler warning: Failed to acquire read lock after 5 tries: {}".format(e))
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
count += 1
|
||||
continue
|
||||
|
||||
return lock
|
||||
|
||||
def writelock(self, key):
|
||||
"""
|
||||
Acquires a write lock on a key
|
||||
"""
|
||||
count = 1
|
||||
lock = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = self.zk_conn.WriteLock(key, lock_id)
|
||||
break
|
||||
except Exception as e:
|
||||
if count > 5:
|
||||
print("ZKHandler warning: Failed to acquire write lock after 5 tries: {}".format(e))
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
count += 1
|
||||
continue
|
||||
|
||||
return lock
|
||||
|
||||
def exclusivelock(self, key):
|
||||
"""
|
||||
Acquires an exclusive lock on a key
|
||||
"""
|
||||
count = 1
|
||||
lock = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = self.zk_conn.Lock(key, lock_id)
|
||||
break
|
||||
except Exception as e:
|
||||
if count > 5:
|
||||
print("ZKHandler warning: Failed to acquire exclusive lock after 5 tries: {}".format(e))
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
count += 1
|
||||
continue
|
||||
|
||||
return lock
|
||||
|
|
Loading…
Reference in New Issue