Add timeouts to queue gets and adjust

Ensure that all keepalive timeouts are set (prevent the queue.get()
actions from blocking forever) and set the thread timeouts to line up as
well. Everything here is thus limited to keepalive_interval seconds
(default 5s) to keep it uniform.
This commit is contained in:
Joshua Boniface 2021-09-27 16:10:27 -04:00
parent e514eed414
commit 3b41759262
1 changed files with 13 additions and 12 deletions

View File

@ -592,23 +592,23 @@ def node_keepalive(logger, config, zkhandler, this_node):
# Join against running threads # Join against running threads
if config['enable_hypervisor']: if config['enable_hypervisor']:
vm_stats_thread.join(timeout=4.0) vm_stats_thread.join(timeout=config['keepalive_interval'])
if vm_stats_thread.is_alive(): if vm_stats_thread.is_alive():
logger.out('VM stats gathering exceeded 4s timeout, continuing', state='w') logger.out('VM stats gathering exceeded timeout, continuing', state='w')
if config['enable_storage']: if config['enable_storage']:
ceph_stats_thread.join(timeout=4.0) ceph_stats_thread.join(timeout=config['keepalive_interval'])
if ceph_stats_thread.is_alive(): if ceph_stats_thread.is_alive():
logger.out('Ceph stats gathering exceeded 4s timeout, continuing', state='w') logger.out('Ceph stats gathering exceeded timeout, continuing', state='w')
# Get information from thread queues # Get information from thread queues
if config['enable_hypervisor']: if config['enable_hypervisor']:
try: try:
this_node.domains_count = vm_thread_queue.get() this_node.domains_count = vm_thread_queue.get(timeout=config['keepalive_interval'])
this_node.memalloc = vm_thread_queue.get() this_node.memalloc = vm_thread_queue.get(timeout=config['keepalive_interval'])
this_node.memprov = vm_thread_queue.get() this_node.memprov = vm_thread_queue.get(timeout=config['keepalive_interval'])
this_node.vcpualloc = vm_thread_queue.get() this_node.vcpualloc = vm_thread_queue.get(timeout=config['keepalive_interval'])
except Exception: except Exception:
pass logger.out('VM stats queue get exceeded timeout, continuing', state='w')
else: else:
this_node.domains_count = 0 this_node.domains_count = 0
this_node.memalloc = 0 this_node.memalloc = 0
@ -617,10 +617,11 @@ def node_keepalive(logger, config, zkhandler, this_node):
if config['enable_storage']: if config['enable_storage']:
try: try:
ceph_health_colour = ceph_thread_queue.get() ceph_health_colour = ceph_thread_queue.get(timeout=config['keepalive_interval'])
ceph_health = ceph_thread_queue.get() ceph_health = ceph_thread_queue.get(timeout=config['keepalive_interval'])
osds_this_node = ceph_thread_queue.get() osds_this_node = ceph_thread_queue.get(timeout=config['keepalive_interval'])
except Exception: except Exception:
logger.out('Ceph stats queue get exceeded timeout, continuing', state='w')
ceph_health_colour = logger.fmt_cyan ceph_health_colour = logger.fmt_cyan
ceph_health = 'UNKNOWN' ceph_health = 'UNKNOWN'
osds_this_node = '?' osds_this_node = '?'