From a9a57533a7b515a61ad60e30d465ca8969afe0df Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 9 Jun 2021 13:23:57 -0400 Subject: [PATCH] Integrate schema handling within ZKHandler Abstracts away the schema management, especially when doing actions, to prevent duplication in other areas. --- daemon-common/zkhandler.py | 109 ++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 32 deletions(-) diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index c7de2cc2..b36eef49 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -49,6 +49,7 @@ class ZKConnection(object): def connection(*args, **kwargs): zkhandler = ZKHandler(self.config) zkhandler.connect() + zkhandler.schema.load(zkhandler.read(zkhandler.schema.path('base.schema.version')), quiet=True) ret = function(zkhandler, *args, **kwargs) @@ -87,11 +88,14 @@ class ZKHandler(object): Initialize an instance of the ZKHandler class with config A zk_conn object will be created but not started + + A ZKSchema instance will be created """ self.encoding = 'utf8' self.coordinators = config['coordinators'] self.logger = logger self.zk_conn = KazooClient(hosts=self.coordinators) + self._schema = ZKSchema() # # Class meta-functions @@ -105,6 +109,13 @@ class ZKHandler(object): else: print(message) + # + # Properties + # + @property + def schema(self): + return self._schema + # # State/connection management # @@ -130,7 +141,7 @@ class ZKHandler(object): def connect(self, persistent=False): """ - Start the zk_conn object and connect to the cluster + Start the zk_conn object and connect to the cluster, then load the current schema version """ try: self.zk_conn.start() @@ -148,6 +159,23 @@ class ZKHandler(object): self.zk_conn.stop() self.zk_conn.close() + # + # Schema helper actions + # + def get_schema_path(self, key): + if isinstance(key, tuple): + # This is a key tuple with both an ipath and an item + ipath, item = key + elif isinstance(key, str): + # This is a key string with just an ipath + ipath = key + item = None + else: + # This is an invalid key + return None + + return self.schema.path(ipath, item=item) + # # Key Actions # @@ -155,7 +183,8 @@ class ZKHandler(object): """ Check if a key exists """ - stat = self.zk_conn.exists(key) + path = self.get_schema_path(key) + stat = self.zk_conn.exists(path) if stat: return True else: @@ -165,7 +194,8 @@ class ZKHandler(object): """ Read data from a key """ - return self.zk_conn.get(key)[0].decode(self.encoding) + path = self.get_schema_path(key) + return self.zk_conn.get(path)[0].decode(self.encoding) def write(self, kvpairs): """ @@ -185,26 +215,28 @@ class ZKHandler(object): key = kvpair[0] value = kvpair[1] - if not self.exists(key): + path = self.get_schema_path(key) + + if not self.exists(path): # Creating a new key - transaction.create(key, str(value).encode(self.encoding)) + transaction.create(path, str(value).encode(self.encoding)) else: # Updating an existing key - data = self.zk_conn.get(key) + data = self.zk_conn.get(path) version = data[1].version # Validate the expected version after the execution new_version = version + 1 # Update the data - transaction.set_data(key, str(value).encode(self.encoding)) + transaction.set_data(path, str(value).encode(self.encoding)) # Check the data try: - transaction.check(key, new_version) + transaction.check(path, new_version) except TypeError: - self.log("ZKHandler error: Key '{}' does not match expected version".format(key), state='e') + self.log("ZKHandler error: Key '{}' does not match expected version".format(path), state='e') return False try: @@ -222,11 +254,12 @@ class ZKHandler(object): keys = [keys] for key in keys: - if self.exists(key): + path = self.get_schema_path(key) + if self.exists(path): try: - self.zk_conn.delete(key, recursive=recursive) + self.zk_conn.delete(path, recursive=recursive) except Exception as e: - self.log("ZKHandler error: Failed to delete key {}: {}".format(key, e), state='e') + self.log("ZKHandler error: Failed to delete key {}: {}".format(path, e), state='e') return False return True @@ -235,7 +268,8 @@ class ZKHandler(object): """ Lists all children of a key """ - return self.zk_conn.get_children(key) + path = self.get_schema_path(key) + return self.zk_conn.get_children(path) def rename(self, kkpairs): """ @@ -247,17 +281,17 @@ class ZKHandler(object): transaction = self.zk_conn.transaction() - def rename_element(transaction, source_key, destnation_key): - data = self.zk_conn.get(source_key)[0] - transaction.create(destination_key, data) + def rename_element(transaction, source_path, destnation_path): + data = self.zk_conn.get(source_path)[0] + transaction.create(destination_path, data) - if self.children(source_key): - for child_key in self.children(source_key): - child_source_key = "{}/{}".format(source_key, child_key) - child_destination_key = "{}/{}".format(destination_key, child_key) - rename_element(transaction, child_source_key, child_destination_key) + if self.children(source_path): + for child_path in self.children(source_path): + child_source_path = "{}/{}".format(source_path, child_path) + child_destination_path = "{}/{}".format(destination_path, child_path) + rename_element(transaction, child_source_path, child_destination_path) - transaction.delete(source_key) + transaction.delete(source_path) for kkpair in (kkpairs): if type(kkpair) is not tuple: @@ -265,16 +299,19 @@ class ZKHandler(object): return False source_key = kkpair[0] + source_path = self.get_schema_path(source_key) + destination_key = kkpair[1] + destination_path = self.get_schema_path(destination_key) - if not self.exists(source_key): - self.log("ZKHander error: Source key '{}' does not exist".format(source_key), state='e') + if not self.exists(source_path): + self.log("ZKHander error: Source key '{}' does not exist".format(source_path), state='e') return False - if self.exists(destination_key): - self.log("ZKHander error: Destination key '{}' already exists".format(destination_key), state='e') + if self.exists(destination_path): + self.log("ZKHander error: Destination key '{}' already exists".format(destination_path), state='e') return False - rename_element(transaction, source_key, destination_key) + rename_element(transaction, source_path, destination_path) try: transaction.commit() @@ -293,10 +330,12 @@ class ZKHandler(object): count = 1 lock = None + path = self.get_schema_path(key) + while True: try: lock_id = str(uuid.uuid1()) - lock = self.zk_conn.ReadLock(key, lock_id) + lock = self.zk_conn.ReadLock(path, lock_id) break except Exception as e: if count > 5: @@ -316,10 +355,12 @@ class ZKHandler(object): count = 1 lock = None + path = self.get_schema_path(key) + while True: try: lock_id = str(uuid.uuid1()) - lock = self.zk_conn.WriteLock(key, lock_id) + lock = self.zk_conn.WriteLock(path, lock_id) break except Exception as e: if count > 5: @@ -339,10 +380,12 @@ class ZKHandler(object): count = 1 lock = None + path = self.get_schema_path(key) + while True: try: lock_id = str(uuid.uuid1()) - lock = self.zk_conn.Lock(key, lock_id) + lock = self.zk_conn.Lock(path, lock_id) break except Exception as e: if count > 5: @@ -537,8 +580,10 @@ class ZKSchema(object): return False # Load the schema of a given version from a file - def load(self, version): - print(f'Loading schema version {version}') + def load(self, version, quiet=False): + if not quiet: + print(f'Loading schema version {version}') + with open(f'daemon_lib/migrations/versions/{version}.json', 'r') as sfh: self.schema = json.load(sfh) self.version = self.schema.get('version')