Add read lock timeouts to prevent deadlocks

This commit is contained in:
Joshua Boniface 2024-10-10 15:19:05 -04:00
parent 70c588d3a8
commit 4c0d90b517
1 changed files with 31 additions and 13 deletions

View File

@ -438,8 +438,11 @@ class NodeInstance(object):
# Synchronize nodes B (I am reader) # Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase B", state="i") self.logger.out("Acquiring read lock for synchronization phase B", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase B", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
self.logger.out("Releasing read lock for synchronization phase B", state="i") self.logger.out("Releasing read lock for synchronization phase B", state="i")
lock.release() lock.release()
self.logger.out("Released read lock for synchronization phase B", state="o") self.logger.out("Released read lock for synchronization phase B", state="o")
@ -648,8 +651,11 @@ class NodeInstance(object):
# Synchronize nodes A (I am reader) # Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase A", state="i") self.logger.out("Acquiring read lock for synchronization phase A", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase A", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
self.logger.out("Releasing read lock for synchronization phase A", state="i") self.logger.out("Releasing read lock for synchronization phase A", state="i")
lock.release() lock.release()
self.logger.out("Released read lock for synchronization phase A", state="o") self.logger.out("Released read lock for synchronization phase A", state="o")
@ -682,8 +688,11 @@ class NodeInstance(object):
# Synchronize nodes C (I am reader) # Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase C", state="i") self.logger.out("Acquiring read lock for synchronization phase C", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase C", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 5. Remove Upstream floating IP # 5. Remove Upstream floating IP
self.logger.out( self.logger.out(
"Removing floating upstream IP {}/{} from interface {}".format( "Removing floating upstream IP {}/{} from interface {}".format(
@ -701,8 +710,11 @@ class NodeInstance(object):
# Synchronize nodes D (I am reader) # Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase D", state="i") self.logger.out("Acquiring read lock for synchronization phase D", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase D", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 6. Remove Cluster & Storage floating IP # 6. Remove Cluster & Storage floating IP
self.logger.out( self.logger.out(
"Removing floating management IP {}/{} from interface {}".format( "Removing floating management IP {}/{} from interface {}".format(
@ -729,8 +741,11 @@ class NodeInstance(object):
# Synchronize nodes E (I am reader) # Synchronize nodes E (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase E", state="i") self.logger.out("Acquiring read lock for synchronization phase E", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase E", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 7. Remove Metadata link-local IP # 7. Remove Metadata link-local IP
self.logger.out( self.logger.out(
"Removing Metadata link-local IP {}/{} from interface {}".format( "Removing Metadata link-local IP {}/{} from interface {}".format(
@ -746,8 +761,11 @@ class NodeInstance(object):
# Synchronize nodes F (I am reader) # Synchronize nodes F (I am reader)
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase F", state="i") self.logger.out("Acquiring read lock for synchronization phase F", state="i")
lock.acquire() try:
self.logger.out("Acquired read lock for synchronization phase F", state="o") lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception:
pass
# 8. Remove gateway IPs # 8. Remove gateway IPs
for network in self.d_network: for network in self.d_network:
self.d_network[network].removeGateways() self.d_network[network].removeGateways()
@ -759,7 +777,7 @@ class NodeInstance(object):
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock") lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
self.logger.out("Acquiring read lock for synchronization phase G", state="i") self.logger.out("Acquiring read lock for synchronization phase G", state="i")
try: try:
lock.acquire(timeout=60) # Don't wait forever and completely block us lock.acquire(timeout=5) # Don't wait forever and completely block us
self.logger.out("Acquired read lock for synchronization phase G", state="o") self.logger.out("Acquired read lock for synchronization phase G", state="o")
except Exception: except Exception:
pass pass