From 43009486ae0c356f78a83b521dc7aedd6e42d57d Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 1 Jul 2021 17:33:13 -0400 Subject: [PATCH] Move Ceph pool/volume list assembly to thread pool Same reasons as the VM list, though less impactful. --- daemon-common/ceph.py | 47 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/daemon-common/ceph.py b/daemon-common/ceph.py index 00878dc1..4b93622c 100644 --- a/daemon-common/ceph.py +++ b/daemon-common/ceph.py @@ -25,6 +25,8 @@ import json import time import math +from concurrent.futures import ThreadPoolExecutor + import daemon_lib.vm as vm import daemon_lib.common as common @@ -413,24 +415,36 @@ def remove_pool(zkhandler, name): def get_list_pool(zkhandler, limit, is_fuzzy=True): - pool_list = [] full_pool_list = zkhandler.children('base.pool') if limit: if not is_fuzzy: limit = '^' + limit + '$' + get_pool_info = dict() for pool in full_pool_list: + is_limit_match = False if limit: try: if re.match(limit, pool): - pool_list.append(getPoolInformation(zkhandler, pool)) + is_limit_match = True except Exception as e: return False, 'Regex Error: {}'.format(e) else: - pool_list.append(getPoolInformation(zkhandler, pool)) + is_limit_match = True - return True, sorted(pool_list, key=lambda x: int(x['stats']['id'])) + get_pool_info[pool] = True if is_limit_match else False + + pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]] + pool_data_list = list() + with ThreadPoolExecutor(max_workers=32, thread_name_prefix='pool_list') as executor: + futures = [] + for pool in pool_execute_list: + futures.append(executor.submit(getPoolInformation, zkhandler, pool)) + for future in futures: + pool_data_list.append(future.result()) + + return True, sorted(pool_data_list, key=lambda x: int(x['stats']['id'])) # @@ -652,7 +666,6 @@ def unmap_volume(zkhandler, pool, name): def get_list_volume(zkhandler, pool, limit, is_fuzzy=True): - volume_list = [] if pool and not verifyPool(zkhandler, pool): return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(pool) @@ -668,18 +681,36 @@ def get_list_volume(zkhandler, pool, limit, is_fuzzy=True): if not re.match(r'.*\$', limit): limit = limit + '.*' + get_volume_info = dict() for volume in full_volume_list: pool_name, volume_name = volume.split('/') + is_limit_match = False + + # Check on limit if limit: + # Try to match the limit against the volume name try: if re.match(limit, volume_name): - volume_list.append(getVolumeInformation(zkhandler, pool_name, volume_name)) + is_limit_match = True except Exception as e: return False, 'Regex Error: {}'.format(e) else: - volume_list.append(getVolumeInformation(zkhandler, pool_name, volume_name)) + is_limit_match = True - return True, sorted(volume_list, key=lambda x: str(x['name'])) + get_volume_info[volume] = True if is_limit_match else False + + # Obtain our volume data in a thread pool + volume_execute_list = [volume for volume in full_volume_list if get_volume_info[volume]] + volume_data_list = list() + with ThreadPoolExecutor(max_workers=32, thread_name_prefix='volume_list') as executor: + futures = [] + for volume in volume_execute_list: + pool_name, volume_name = volume.split('/') + futures.append(executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name)) + for future in futures: + volume_data_list.append(future.result()) + + return True, sorted(volume_data_list, key=lambda x: str(x['name'])) #