Compare commits
No commits in common. "c08c3b2d7d832939d0d9ceb5e8c889223cb7758b" and "70c588d3a8af1593b1f6a90d6bf189a8043e6b4e" have entirely different histories.
c08c3b2d7d
...
70c588d3a8
@ -438,11 +438,8 @@ class NodeInstance(object):
|
|||||||
# Synchronize nodes B (I am reader)
|
# Synchronize nodes B (I am reader)
|
||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase B", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase B", state="i")
|
||||||
try:
|
lock.acquire()
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
self.logger.out("Acquired read lock for synchronization phase B", state="o")
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self.logger.out("Releasing read lock for synchronization phase B", state="i")
|
self.logger.out("Releasing read lock for synchronization phase B", state="i")
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out("Released read lock for synchronization phase B", state="o")
|
self.logger.out("Released read lock for synchronization phase B", state="o")
|
||||||
@ -651,11 +648,8 @@ class NodeInstance(object):
|
|||||||
# Synchronize nodes A (I am reader)
|
# Synchronize nodes A (I am reader)
|
||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase A", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase A", state="i")
|
||||||
try:
|
lock.acquire()
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
self.logger.out("Acquired read lock for synchronization phase A", state="o")
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self.logger.out("Releasing read lock for synchronization phase A", state="i")
|
self.logger.out("Releasing read lock for synchronization phase A", state="i")
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out("Released read lock for synchronization phase A", state="o")
|
self.logger.out("Released read lock for synchronization phase A", state="o")
|
||||||
@ -688,11 +682,8 @@ class NodeInstance(object):
|
|||||||
# Synchronize nodes C (I am reader)
|
# Synchronize nodes C (I am reader)
|
||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase C", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase C", state="i")
|
||||||
try:
|
lock.acquire()
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
self.logger.out("Acquired read lock for synchronization phase C", state="o")
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# 5. Remove Upstream floating IP
|
# 5. Remove Upstream floating IP
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Removing floating upstream IP {}/{} from interface {}".format(
|
"Removing floating upstream IP {}/{} from interface {}".format(
|
||||||
@ -710,11 +701,8 @@ class NodeInstance(object):
|
|||||||
# Synchronize nodes D (I am reader)
|
# Synchronize nodes D (I am reader)
|
||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase D", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase D", state="i")
|
||||||
try:
|
lock.acquire()
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
self.logger.out("Acquired read lock for synchronization phase D", state="o")
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# 6. Remove Cluster & Storage floating IP
|
# 6. Remove Cluster & Storage floating IP
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Removing floating management IP {}/{} from interface {}".format(
|
"Removing floating management IP {}/{} from interface {}".format(
|
||||||
@ -741,11 +729,8 @@ class NodeInstance(object):
|
|||||||
# Synchronize nodes E (I am reader)
|
# Synchronize nodes E (I am reader)
|
||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase E", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase E", state="i")
|
||||||
try:
|
lock.acquire()
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
self.logger.out("Acquired read lock for synchronization phase E", state="o")
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# 7. Remove Metadata link-local IP
|
# 7. Remove Metadata link-local IP
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Removing Metadata link-local IP {}/{} from interface {}".format(
|
"Removing Metadata link-local IP {}/{} from interface {}".format(
|
||||||
@ -761,11 +746,8 @@ class NodeInstance(object):
|
|||||||
# Synchronize nodes F (I am reader)
|
# Synchronize nodes F (I am reader)
|
||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase F", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase F", state="i")
|
||||||
try:
|
lock.acquire()
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
self.logger.out("Acquired read lock for synchronization phase F", state="o")
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# 8. Remove gateway IPs
|
# 8. Remove gateway IPs
|
||||||
for network in self.d_network:
|
for network in self.d_network:
|
||||||
self.d_network[network].removeGateways()
|
self.d_network[network].removeGateways()
|
||||||
@ -777,7 +759,7 @@ class NodeInstance(object):
|
|||||||
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
lock = self.zkhandler.readlock("base.config.primary_node.sync_lock")
|
||||||
self.logger.out("Acquiring read lock for synchronization phase G", state="i")
|
self.logger.out("Acquiring read lock for synchronization phase G", state="i")
|
||||||
try:
|
try:
|
||||||
lock.acquire(timeout=5) # Don't wait forever and completely block us
|
lock.acquire(timeout=60) # Don't wait forever and completely block us
|
||||||
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
self.logger.out("Acquired read lock for synchronization phase G", state="o")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
@ -756,21 +756,29 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
|||||||
|
|
||||||
# Join against running threads
|
# Join against running threads
|
||||||
if config["enable_hypervisor"]:
|
if config["enable_hypervisor"]:
|
||||||
vm_stats_thread.join(timeout=config["keepalive_interval"] - 1)
|
vm_stats_thread.join(timeout=config["keepalive_interval"])
|
||||||
if vm_stats_thread.is_alive():
|
if vm_stats_thread.is_alive():
|
||||||
logger.out("VM stats gathering exceeded timeout, continuing", state="w")
|
logger.out("VM stats gathering exceeded timeout, continuing", state="w")
|
||||||
if config["enable_storage"]:
|
if config["enable_storage"]:
|
||||||
ceph_stats_thread.join(timeout=config["keepalive_interval"] - 1)
|
ceph_stats_thread.join(timeout=config["keepalive_interval"])
|
||||||
if ceph_stats_thread.is_alive():
|
if ceph_stats_thread.is_alive():
|
||||||
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
|
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
|
||||||
|
|
||||||
# Get information from thread queues
|
# Get information from thread queues
|
||||||
if config["enable_hypervisor"]:
|
if config["enable_hypervisor"]:
|
||||||
try:
|
try:
|
||||||
this_node.domains_count = vm_thread_queue.get(timeout=0.1)
|
this_node.domains_count = vm_thread_queue.get(
|
||||||
this_node.memalloc = vm_thread_queue.get(timeout=0.1)
|
timeout=config["keepalive_interval"]
|
||||||
this_node.memprov = vm_thread_queue.get(timeout=0.1)
|
)
|
||||||
this_node.vcpualloc = vm_thread_queue.get(timeout=0.1)
|
this_node.memalloc = vm_thread_queue.get(
|
||||||
|
timeout=config["keepalive_interval"]
|
||||||
|
)
|
||||||
|
this_node.memprov = vm_thread_queue.get(
|
||||||
|
timeout=config["keepalive_interval"]
|
||||||
|
)
|
||||||
|
this_node.vcpualloc = vm_thread_queue.get(
|
||||||
|
timeout=config["keepalive_interval"]
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.out("VM stats queue get exceeded timeout, continuing", state="w")
|
logger.out("VM stats queue get exceeded timeout, continuing", state="w")
|
||||||
else:
|
else:
|
||||||
@ -781,7 +789,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
|||||||
|
|
||||||
if config["enable_storage"]:
|
if config["enable_storage"]:
|
||||||
try:
|
try:
|
||||||
osds_this_node = ceph_thread_queue.get(timeout=0.1)
|
osds_this_node = ceph_thread_queue.get(
|
||||||
|
timeout=(config["keepalive_interval"] - 1)
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
|
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
|
||||||
osds_this_node = "?"
|
osds_this_node = "?"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user