Compare commits

..

2 Commits

Author SHA1 Message Date
c08c3b2d7d Improve thread timeouts in keepalive
Avoids various parts of the keepalive deadlocking waiting on data that
will never come when various internal processes fail. This should ensure
based on testing that the keepalive will always finish in <5 seconds.
2024-10-10 15:33:47 -04:00
4c0d90b517 Add read lock timeouts to prevent deadlocks 2024-10-10 15:19:05 -04:00
2 changed files with 38 additions and 30 deletions

View File

@ -438,8 +438,11 @@ class NodeInstance(object):
# Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase B", state="i")
lock.acquire()
self.logger.out("Acquired read lock for synchronization phase B", state="o")
try:
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
self.logger.out("Releasing read lock for synchronization phase B", state="i")
lock.release()
self.logger.out("Released read lock for synchronization phase B", state="o")
@ -648,8 +651,11 @@ class NodeInstance(object):
# Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase A", state="i")
lock.acquire()
self.logger.out("Acquired read lock for synchronization phase A", state="o")
try:
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
self.logger.out("Releasing read lock for synchronization phase A", state="i")
lock.release()
self.logger.out("Released read lock for synchronization phase A", state="o")
@ -682,8 +688,11 @@ class NodeInstance(object):
# Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase C", state="i")
lock.acquire()
self.logger.out("Acquired read lock for synchronization phase C", state="o")
try:
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 5. Remove Upstream floating IP
self.logger.out(
"Removing floating upstream IP {}/{} from interface {}".format(
@ -701,8 +710,11 @@ class NodeInstance(object):
# Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase D", state="i")
lock.acquire()
self.logger.out("Acquired read lock for synchronization phase D", state="o")
try:
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 6. Remove Cluster & Storage floating IP
self.logger.out(
"Removing floating management IP {}/{} from interface {}".format(
@ -729,8 +741,11 @@ class NodeInstance(object):
# Synchronize nodes E (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase E", state="i")
lock.acquire()
self.logger.out("Acquired read lock for synchronization phase E", state="o")
try:
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 7. Remove Metadata link-local IP
self.logger.out(
"Removing Metadata link-local IP {}/{} from interface {}".format(
@ -746,8 +761,11 @@ class NodeInstance(object):
# Synchronize nodes F (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase F", state="i")
lock.acquire()
self.logger.out("Acquired read lock for synchronization phase F", state="o")
try:
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 8. Remove gateway IPs
for network in self.d_network:
self.d_network[network].removeGateways()
@ -759,7 +777,7 @@ class NodeInstance(object):
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase G", state="i")
try:
lock.acquire(timeout=60) # Don't wait forever and completely block us
lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass

View File

@ -756,29 +756,21 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Join against running threads
if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"])
vm_stats_thread.join(timeout=config["keepalive_interval"] - 1)
if vm_stats_thread.is_alive():
logger.out("VM stats gathering exceeded timeout, continuing", state="w")
if config["enable_storage"]:
ceph_stats_thread.join(timeout=config["keepalive_interval"])
ceph_stats_thread.join(timeout=config["keepalive_interval"] - 1)
if ceph_stats_thread.is_alive():
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
# Get information from thread queues
if config["enable_hypervisor"]:
try:
this_node.domains_count = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memalloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memprov = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.vcpualloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.domains_count = vm_thread_queue.get(timeout=0.1)
this_node.memalloc = vm_thread_queue.get(timeout=0.1)
this_node.memprov = vm_thread_queue.get(timeout=0.1)
this_node.vcpualloc = vm_thread_queue.get(timeout=0.1)
except Exception:
logger.out("VM stats queue get exceeded timeout, continuing", state="w")
else:
@ -789,9 +781,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
if config["enable_storage"]:
try:
osds_this_node = ceph_thread_queue.get(
timeout=(config["keepalive_interval"] - 1)
)
osds_this_node = ceph_thread_queue.get(timeout=0.1)
except Exception:
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
osds_this_node = "?"