From 932978401004df6100addaa216d8cfbe7743231f Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sun, 10 Dec 2023 14:50:16 -0500 Subject: [PATCH] Implement async ZK read function Adds a function, "read_many", which can take in multiple ZK keys and return the values from all of them, using asyncio to avoid reading sequentially. Initial tests show a marked improvement in read performance of multiple read()-heavy functions (e.g. "get_list()" functions) with this method. --- daemon-common/zkhandler.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 51b250df..72cff575 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -19,6 +19,7 @@ # ############################################################################### +import asyncio import os import time import uuid @@ -239,10 +240,41 @@ class ZKHandler(object): # This path is invalid; this is likely due to missing schema entries, so return None return None - return self.zk_conn.get(path)[0].decode(self.encoding) + res = self.zk_conn.get(path) + return res[0].decode(self.encoding) except NoNodeError: return None + async def read_async(self, key): + """ + Read data from a key asynchronously + """ + try: + path = self.get_schema_path(key) + if path is None: + # This path is invalid; this is likely due to missing schema entries, so return None + return None + + val = self.zk_conn.get_async(path) + data = val.get() + return data[0].decode(self.encoding) + except NoNodeError: + return None + + async def _read_many(self, keys): + """ + Async runner for read_many + """ + res = await asyncio.gather(*(self.read_async(key) for key in keys)) + return tuple(res) + + def read_many(self, keys): + """ + Read data from several keys, asynchronously. Returns a tuple of all key values once all + reads are complete. + """ + return asyncio.run(self._read_many(keys)) + def write(self, kvpairs): """ Create or update one or more keys' data