Integrate schema handling within ZKHandler

Abstracts away the schema management, especially when doing actions, to
prevent duplication in other areas.
This commit is contained in:
Joshua Boniface 2021-06-09 13:23:57 -04:00
parent 76c37e6628
commit a9a57533a7
1 changed files with 77 additions and 32 deletions

View File

@ -49,6 +49,7 @@ class ZKConnection(object):
def connection(*args, **kwargs):
zkhandler = ZKHandler(self.config)
zkhandler.connect()
zkhandler.schema.load(zkhandler.read(zkhandler.schema.path('base.schema.version')), quiet=True)
ret = function(zkhandler, *args, **kwargs)
@ -87,11 +88,14 @@ class ZKHandler(object):
Initialize an instance of the ZKHandler class with config
A zk_conn object will be created but not started
A ZKSchema instance will be created
"""
self.encoding = 'utf8'
self.coordinators = config['coordinators']
self.logger = logger
self.zk_conn = KazooClient(hosts=self.coordinators)
self._schema = ZKSchema()
#
# Class meta-functions
@ -105,6 +109,13 @@ class ZKHandler(object):
else:
print(message)
#
# Properties
#
@property
def schema(self):
return self._schema
#
# State/connection management
#
@ -130,7 +141,7 @@ class ZKHandler(object):
def connect(self, persistent=False):
"""
Start the zk_conn object and connect to the cluster
Start the zk_conn object and connect to the cluster, then load the current schema version
"""
try:
self.zk_conn.start()
@ -148,6 +159,23 @@ class ZKHandler(object):
self.zk_conn.stop()
self.zk_conn.close()
#
# Schema helper actions
#
def get_schema_path(self, key):
if isinstance(key, tuple):
# This is a key tuple with both an ipath and an item
ipath, item = key
elif isinstance(key, str):
# This is a key string with just an ipath
ipath = key
item = None
else:
# This is an invalid key
return None
return self.schema.path(ipath, item=item)
#
# Key Actions
#
@ -155,7 +183,8 @@ class ZKHandler(object):
"""
Check if a key exists
"""
stat = self.zk_conn.exists(key)
path = self.get_schema_path(key)
stat = self.zk_conn.exists(path)
if stat:
return True
else:
@ -165,7 +194,8 @@ class ZKHandler(object):
"""
Read data from a key
"""
return self.zk_conn.get(key)[0].decode(self.encoding)
path = self.get_schema_path(key)
return self.zk_conn.get(path)[0].decode(self.encoding)
def write(self, kvpairs):
"""
@ -185,26 +215,28 @@ class ZKHandler(object):
key = kvpair[0]
value = kvpair[1]
if not self.exists(key):
path = self.get_schema_path(key)
if not self.exists(path):
# Creating a new key
transaction.create(key, str(value).encode(self.encoding))
transaction.create(path, str(value).encode(self.encoding))
else:
# Updating an existing key
data = self.zk_conn.get(key)
data = self.zk_conn.get(path)
version = data[1].version
# Validate the expected version after the execution
new_version = version + 1
# Update the data
transaction.set_data(key, str(value).encode(self.encoding))
transaction.set_data(path, str(value).encode(self.encoding))
# Check the data
try:
transaction.check(key, new_version)
transaction.check(path, new_version)
except TypeError:
self.log("ZKHandler error: Key '{}' does not match expected version".format(key), state='e')
self.log("ZKHandler error: Key '{}' does not match expected version".format(path), state='e')
return False
try:
@ -222,11 +254,12 @@ class ZKHandler(object):
keys = [keys]
for key in keys:
if self.exists(key):
path = self.get_schema_path(key)
if self.exists(path):
try:
self.zk_conn.delete(key, recursive=recursive)
self.zk_conn.delete(path, recursive=recursive)
except Exception as e:
self.log("ZKHandler error: Failed to delete key {}: {}".format(key, e), state='e')
self.log("ZKHandler error: Failed to delete key {}: {}".format(path, e), state='e')
return False
return True
@ -235,7 +268,8 @@ class ZKHandler(object):
"""
Lists all children of a key
"""
return self.zk_conn.get_children(key)
path = self.get_schema_path(key)
return self.zk_conn.get_children(path)
def rename(self, kkpairs):
"""
@ -247,17 +281,17 @@ class ZKHandler(object):
transaction = self.zk_conn.transaction()
def rename_element(transaction, source_key, destnation_key):
data = self.zk_conn.get(source_key)[0]
transaction.create(destination_key, data)
def rename_element(transaction, source_path, destnation_path):
data = self.zk_conn.get(source_path)[0]
transaction.create(destination_path, data)
if self.children(source_key):
for child_key in self.children(source_key):
child_source_key = "{}/{}".format(source_key, child_key)
child_destination_key = "{}/{}".format(destination_key, child_key)
rename_element(transaction, child_source_key, child_destination_key)
if self.children(source_path):
for child_path in self.children(source_path):
child_source_path = "{}/{}".format(source_path, child_path)
child_destination_path = "{}/{}".format(destination_path, child_path)
rename_element(transaction, child_source_path, child_destination_path)
transaction.delete(source_key)
transaction.delete(source_path)
for kkpair in (kkpairs):
if type(kkpair) is not tuple:
@ -265,16 +299,19 @@ class ZKHandler(object):
return False
source_key = kkpair[0]
source_path = self.get_schema_path(source_key)
destination_key = kkpair[1]
destination_path = self.get_schema_path(destination_key)
if not self.exists(source_key):
self.log("ZKHander error: Source key '{}' does not exist".format(source_key), state='e')
if not self.exists(source_path):
self.log("ZKHander error: Source key '{}' does not exist".format(source_path), state='e')
return False
if self.exists(destination_key):
self.log("ZKHander error: Destination key '{}' already exists".format(destination_key), state='e')
if self.exists(destination_path):
self.log("ZKHander error: Destination key '{}' already exists".format(destination_path), state='e')
return False
rename_element(transaction, source_key, destination_key)
rename_element(transaction, source_path, destination_path)
try:
transaction.commit()
@ -293,10 +330,12 @@ class ZKHandler(object):
count = 1
lock = None
path = self.get_schema_path(key)
while True:
try:
lock_id = str(uuid.uuid1())
lock = self.zk_conn.ReadLock(key, lock_id)
lock = self.zk_conn.ReadLock(path, lock_id)
break
except Exception as e:
if count > 5:
@ -316,10 +355,12 @@ class ZKHandler(object):
count = 1
lock = None
path = self.get_schema_path(key)
while True:
try:
lock_id = str(uuid.uuid1())
lock = self.zk_conn.WriteLock(key, lock_id)
lock = self.zk_conn.WriteLock(path, lock_id)
break
except Exception as e:
if count > 5:
@ -339,10 +380,12 @@ class ZKHandler(object):
count = 1
lock = None
path = self.get_schema_path(key)
while True:
try:
lock_id = str(uuid.uuid1())
lock = self.zk_conn.Lock(key, lock_id)
lock = self.zk_conn.Lock(path, lock_id)
break
except Exception as e:
if count > 5:
@ -537,8 +580,10 @@ class ZKSchema(object):
return False
# Load the schema of a given version from a file
def load(self, version):
print(f'Loading schema version {version}')
def load(self, version, quiet=False):
if not quiet:
print(f'Loading schema version {version}')
with open(f'daemon_lib/migrations/versions/{version}.json', 'r') as sfh:
self.schema = json.load(sfh)
self.version = self.schema.get('version')