Compare commits

..

2 Commits

Author SHA1 Message Date
c08c3b2d7d Improve thread timeouts in keepalive
Avoids various parts of the keepalive deadlocking waiting on data that
will never come when various internal processes fail. This should ensure
based on testing that the keepalive will always finish in <5 seconds.
2024-10-10 15:33:47 -04:00
4c0d90b517 Add read lock timeouts to prevent deadlocks 2024-10-10 15:19:05 -04:00
2 changed files with 38 additions and 30 deletions

View File

@ -438,8 +438,11 @@ class NodeInstance(object):
# Synchronize nodes B (I am reader) # Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase B", state="i") self.logger.out("Acquiring read lock for synchronization phase B", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase B", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
self.logger.out("Releasing read lock for synchronization phase B", state="i") self.logger.out("Releasing read lock for synchronization phase B", state="i")
lock.release() lock.release()
self.logger.out("Released read lock for synchronization phase B", state="o") self.logger.out("Released read lock for synchronization phase B", state="o")
@ -648,8 +651,11 @@ class NodeInstance(object):
# Synchronize nodes A (I am reader) # Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase A", state="i") self.logger.out("Acquiring read lock for synchronization phase A", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase A", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
self.logger.out("Releasing read lock for synchronization phase A", state="i") self.logger.out("Releasing read lock for synchronization phase A", state="i")
lock.release() lock.release()
self.logger.out("Released read lock for synchronization phase A", state="o") self.logger.out("Released read lock for synchronization phase A", state="o")
@ -682,8 +688,11 @@ class NodeInstance(object):
# Synchronize nodes C (I am reader) # Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase C", state="i") self.logger.out("Acquiring read lock for synchronization phase C", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase C", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 5. Remove Upstream floating IP # 5. Remove Upstream floating IP
self.logger.out( self.logger.out(
"Removing floating upstream IP {}/{} from interface {}".format( "Removing floating upstream IP {}/{} from interface {}".format(
@ -701,8 +710,11 @@ class NodeInstance(object):
# Synchronize nodes D (I am reader) # Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase D", state="i") self.logger.out("Acquiring read lock for synchronization phase D", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase D", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 6. Remove Cluster & Storage floating IP # 6. Remove Cluster & Storage floating IP
self.logger.out( self.logger.out(
"Removing floating management IP {}/{} from interface {}".format( "Removing floating management IP {}/{} from interface {}".format(
@ -729,8 +741,11 @@ class NodeInstance(object):
# Synchronize nodes E (I am reader) # Synchronize nodes E (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase E", state="i") self.logger.out("Acquiring read lock for synchronization phase E", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase E", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 7. Remove Metadata link-local IP # 7. Remove Metadata link-local IP
self.logger.out( self.logger.out(
"Removing Metadata link-local IP {}/{} from interface {}".format( "Removing Metadata link-local IP {}/{} from interface {}".format(
@ -746,8 +761,11 @@ class NodeInstance(object):
# Synchronize nodes F (I am reader) # Synchronize nodes F (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase F", state="i") self.logger.out("Acquiring read lock for synchronization phase F", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase F", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 8. Remove gateway IPs # 8. Remove gateway IPs
for network in self.d_network: for network in self.d_network:
self.d_network[network].removeGateways() self.d_network[network].removeGateways()
@ -759,7 +777,7 @@ class NodeInstance(object):
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase G", state="i") self.logger.out("Acquiring read lock for synchronization phase G", state="i")
try: try:
lock.acquire(timeout=60) # Don't wait forever and completely block us lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o") self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception: except Exception:
pass pass

View File

@ -756,29 +756,21 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Join against running threads # Join against running threads
if config["enable_hypervisor"]: if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"]) vm_stats_thread.join(timeout=config["keepalive_interval"] - 1)
if vm_stats_thread.is_alive(): if vm_stats_thread.is_alive():
logger.out("VM stats gathering exceeded timeout, continuing", state="w") logger.out("VM stats gathering exceeded timeout, continuing", state="w")
if config["enable_storage"]: if config["enable_storage"]:
ceph_stats_thread.join(timeout=config["keepalive_interval"]) ceph_stats_thread.join(timeout=config["keepalive_interval"] - 1)
if ceph_stats_thread.is_alive(): if ceph_stats_thread.is_alive():
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w") logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
# Get information from thread queues # Get information from thread queues
if config["enable_hypervisor"]: if config["enable_hypervisor"]:
try: try:
this_node.domains_count = vm_thread_queue.get( this_node.domains_count = vm_thread_queue.get(timeout=0.1)
timeout=config["keepalive_interval"] this_node.memalloc = vm_thread_queue.get(timeout=0.1)
) this_node.memprov = vm_thread_queue.get(timeout=0.1)
this_node.memalloc = vm_thread_queue.get( this_node.vcpualloc = vm_thread_queue.get(timeout=0.1)
timeout=config["keepalive_interval"]
)
this_node.memprov = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.vcpualloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
except Exception: except Exception:
logger.out("VM stats queue get exceeded timeout, continuing", state="w") logger.out("VM stats queue get exceeded timeout, continuing", state="w")
else: else:
@ -789,9 +781,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
if config["enable_storage"]: if config["enable_storage"]:
try: try:
osds_this_node = ceph_thread_queue.get( osds_this_node = ceph_thread_queue.get(timeout=0.1)
timeout=(config["keepalive_interval"] - 1)
)
except Exception: except Exception:
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w") logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
osds_this_node = "?" osds_this_node = "?"