Implement async ZK read function

Adds a function, "read_many", which can take in multiple ZK keys and
return the values from all of them, using asyncio to avoid reading
sequentially.

Initial tests show a marked improvement in read performance of multiple
read()-heavy functions (e.g. "get_list()" functions) with this method.
This commit is contained in:
Joshua Boniface 2023-12-10 14:50:16 -05:00
parent 9dc5097dbc
commit 9329784010
1 changed files with 33 additions and 1 deletions

View File

@ -19,6 +19,7 @@
# #
############################################################################### ###############################################################################
import asyncio
import os import os
import time import time
import uuid import uuid
@ -239,10 +240,41 @@ class ZKHandler(object):
# This path is invalid; this is likely due to missing schema entries, so return None # This path is invalid; this is likely due to missing schema entries, so return None
return None return None
return self.zk_conn.get(path)[0].decode(self.encoding) res = self.zk_conn.get(path)
return res[0].decode(self.encoding)
except NoNodeError: except NoNodeError:
return None return None
async def read_async(self, key):
"""
Read data from a key asynchronously
"""
try:
path = self.get_schema_path(key)
if path is None:
# This path is invalid; this is likely due to missing schema entries, so return None
return None
val = self.zk_conn.get_async(path)
data = val.get()
return data[0].decode(self.encoding)
except NoNodeError:
return None
async def _read_many(self, keys):
"""
Async runner for read_many
"""
res = await asyncio.gather(*(self.read_async(key) for key in keys))
return tuple(res)
def read_many(self, keys):
"""
Read data from several keys, asynchronously. Returns a tuple of all key values once all
reads are complete.
"""
return asyncio.run(self._read_many(keys))
def write(self, kvpairs): def write(self, kvpairs):
""" """
Create or update one or more keys' data Create or update one or more keys' data