Make zkhandler accept failures more robustly

Most of these would silently fail if there was e.g. an issue with the ZK
connection. Instead, encase things in try blocks and handle the
exceptions in a more graceful way, returning None or False if
applicable. Except for locks, which should retry 5 times before
aborting.
This commit is contained in:
Joshua Boniface 2020-08-17 12:58:26 -04:00
parent 553f96e7ef
commit f9b126a106
1 changed files with 116 additions and 71 deletions

View File

@ -24,55 +24,65 @@ import uuid
# Child list function # Child list function
def listchildren(zk_conn, key): def listchildren(zk_conn, key):
children = zk_conn.get_children(key) try:
return children children = zk_conn.get_children(key)
return children
except Exception:
return None
# Key deletion function # Key deletion function
def deletekey(zk_conn, key, recursive=True): def deletekey(zk_conn, key, recursive=True):
zk_conn.delete(key, recursive=recursive) try:
zk_conn.delete(key, recursive=recursive)
return True
except Exception:
return False
# Data read function # Data read function
def readdata(zk_conn, key): def readdata(zk_conn, key):
data_raw = zk_conn.get(key) try:
data = data_raw[0].decode('utf8') data_raw = zk_conn.get(key)
meta = data_raw[1] data = data_raw[0].decode('utf8')
return data meta = data_raw[1]
return data
except Exception:
return None
# Data write function # Data write function
def writedata(zk_conn, kv): def writedata(zk_conn, kv):
# Start up a transaction
zk_transaction = zk_conn.transaction()
# Proceed one KV pair at a time
for key in sorted(kv):
data = kv[key]
if not data:
data = ''
# Check if this key already exists or not
if not zk_conn.exists(key):
# We're creating a new key
zk_transaction.create(key, str(data).encode('utf8'))
else:
# We're updating a key with version validation
orig_data = zk_conn.get(key)
version = orig_data[1].version
# Set what we expect the new version to be
new_version = version + 1
# Update the data
zk_transaction.set_data(key, str(data).encode('utf8'))
# Set up the check
try:
zk_transaction.check(key, new_version)
except TypeError:
print('Zookeeper key "{}" does not match expected version'.format(key))
return False
# Commit the transaction # Commit the transaction
try: try:
# Start up a transaction
zk_transaction = zk_conn.transaction()
# Proceed one KV pair at a time
for key in sorted(kv):
data = kv[key]
if not data:
data = ''
# Check if this key already exists or not
if not zk_conn.exists(key):
# We're creating a new key
zk_transaction.create(key, str(data).encode('utf8'))
else:
# We're updating a key with version validation
orig_data = zk_conn.get(key)
version = orig_data[1].version
# Set what we expect the new version to be
new_version = version + 1
# Update the data
zk_transaction.set_data(key, str(data).encode('utf8'))
# Set up the check
try:
zk_transaction.check(key, new_version)
except TypeError:
print('Zookeeper key "{}" does not match expected version'.format(key))
return False
zk_transaction.commit() zk_transaction.commit()
return True return True
except Exception: except Exception:
@ -84,54 +94,89 @@ def renamekey(zk_conn, kv):
# support either the recursive delete or recursive create operations that # support either the recursive delete or recursive create operations that
# we need. Why? No explanation in the docs that I can find. # we need. Why? No explanation in the docs that I can find.
# Proceed one KV pair at a time try:
for key in sorted(kv): # Proceed one KV pair at a time
old_name = key for key in sorted(kv):
new_name = kv[key] old_name = key
new_name = kv[key]
old_data = zk_conn.get(old_name)[0] old_data = zk_conn.get(old_name)[0]
# Find the children of old_name recursively # Find the children of old_name recursively
child_keys = list() child_keys = list()
def get_children(key): def get_children(key):
children = zk_conn.get_children(key) children = zk_conn.get_children(key)
if not children: if not children:
child_keys.append(key) child_keys.append(key)
else: else:
for ckey in children: for ckey in children:
get_children('{}/{}'.format(key, ckey)) get_children('{}/{}'.format(key, ckey))
get_children(old_name) get_children(old_name)
# Get the data out of each of the child keys # Get the data out of each of the child keys
child_data = dict() child_data = dict()
for ckey in child_keys: for ckey in child_keys:
child_data[ckey] = zk_conn.get(ckey)[0] child_data[ckey] = zk_conn.get(ckey)[0]
# Create the new parent key # Create the new parent key
zk_conn.create(new_name, old_data, makepath=True) zk_conn.create(new_name, old_data, makepath=True)
# For each child key, create the key and add the data # For each child key, create the key and add the data
for ckey in child_keys: for ckey in child_keys:
new_ckey_name = ckey.replace(old_name, new_name) new_ckey_name = ckey.replace(old_name, new_name)
zk_conn.create(new_ckey_name, child_data[ckey], makepath=True) zk_conn.create(new_ckey_name, child_data[ckey], makepath=True)
# Remove recursively the old key # Remove recursively the old key
zk_conn.delete(old_name, recursive=True) zk_conn.delete(old_name, recursive=True)
return True
except Exception:
return False
# Write lock function # Write lock function
def writelock(zk_conn, key): def writelock(zk_conn, key):
lock_id = str(uuid.uuid1()) count = 1
lock = zk_conn.WriteLock('{}'.format(key), lock_id) while True:
try:
lock_id = str(uuid.uuid1())
lock = zk_conn.WriteLock('{}'.format(key), lock_id)
break
except Exception:
count += 1
if count > 5:
break
else:
continue
return lock return lock
# Read lock function # Read lock function
def readlock(zk_conn, key): def readlock(zk_conn, key):
lock_id = str(uuid.uuid1()) count = 1
lock = zk_conn.ReadLock('{}'.format(key), lock_id) while True:
try:
lock_id = str(uuid.uuid1())
lock = zk_conn.ReadLock('{}'.format(key), lock_id)
break
except Exception:
count += 1
if count > 5:
break
else:
continue
return lock return lock
# Exclusive lock function # Exclusive lock function
def exclusivelock(zk_conn, key): def exclusivelock(zk_conn, key):
lock_id = str(uuid.uuid1()) count = 1
lock = zk_conn.Lock('{}'.format(key), lock_id) while True:
try:
lock_id = str(uuid.uuid1())
lock = zk_conn.Lock('{}'.format(key), lock_id)
break
except Exception:
count += 1
if count > 5:
break
else:
continue
return lock return lock