Convert NodeInstance to new zkhandler
This commit is contained in:
parent
a48bf2d71e
commit
8a4a41e092
|
@ -830,7 +830,7 @@ def update_nodes(new_node_list):
|
||||||
# Add any missing nodes to the list
|
# Add any missing nodes to the list
|
||||||
for node in new_node_list:
|
for node in new_node_list:
|
||||||
if node not in node_list:
|
if node not in node_list:
|
||||||
d_node[node] = NodeInstance.NodeInstance(node, myhostname, zkhandler.zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api)
|
d_node[node] = NodeInstance.NodeInstance(node, myhostname, zkhandler, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api)
|
||||||
|
|
||||||
# Remove any deleted nodes from the list
|
# Remove any deleted nodes from the list
|
||||||
for node in node_list:
|
for node in node_list:
|
||||||
|
|
|
@ -23,23 +23,22 @@ import time
|
||||||
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
import pvcnoded.zkhandler as zkhandler
|
|
||||||
import pvcnoded.common as common
|
import pvcnoded.common as common
|
||||||
|
|
||||||
|
|
||||||
class NodeInstance(object):
|
class NodeInstance(object):
|
||||||
# Initialization function
|
# Initialization function
|
||||||
def __init__(self, name, this_node, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api):
|
def __init__(self, name, this_node, zkhandler, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api):
|
||||||
# Passed-in variables on creation
|
# Passed-in variables on creation
|
||||||
self.name = name
|
self.name = name
|
||||||
self.this_node = this_node
|
self.this_node = this_node
|
||||||
self.zk_conn = zk_conn
|
self.zkhandler = zkhandler
|
||||||
self.config = config
|
self.config = config
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
# Which node is primary
|
# Which node is primary
|
||||||
self.primary_node = None
|
self.primary_node = None
|
||||||
# States
|
# States
|
||||||
self.daemon_mode = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonmode'.format(self.name))
|
self.daemon_mode = self.zkhandler.read('/nodes/{}/daemonmode'.format(self.name))
|
||||||
self.daemon_state = 'stop'
|
self.daemon_state = 'stop'
|
||||||
self.router_state = 'client'
|
self.router_state = 'client'
|
||||||
self.domain_state = 'ready'
|
self.domain_state = 'ready'
|
||||||
|
@ -91,7 +90,7 @@ class NodeInstance(object):
|
||||||
self.flush_stopper = False
|
self.flush_stopper = False
|
||||||
|
|
||||||
# Zookeeper handlers for changed states
|
# Zookeeper handlers for changed states
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
|
||||||
def watch_node_daemonstate(data, stat, event=''):
|
def watch_node_daemonstate(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -106,7 +105,7 @@ class NodeInstance(object):
|
||||||
if data != self.daemon_state:
|
if data != self.daemon_state:
|
||||||
self.daemon_state = data
|
self.daemon_state = data
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name))
|
||||||
def watch_node_routerstate(data, stat, event=''):
|
def watch_node_routerstate(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -135,9 +134,11 @@ class NodeInstance(object):
|
||||||
transition_thread.start()
|
transition_thread.start()
|
||||||
else:
|
else:
|
||||||
# We did nothing, so just become secondary state
|
# We did nothing, so just become secondary state
|
||||||
zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'secondary'})
|
self.zkhandler.write([
|
||||||
|
('/nodes/{}/routerstate'.format(self.name), 'secondary')
|
||||||
|
])
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
|
||||||
def watch_node_domainstate(data, stat, event=''):
|
def watch_node_domainstate(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -170,7 +171,7 @@ class NodeInstance(object):
|
||||||
self.flush_thread = Thread(target=self.unflush, args=(), kwargs={})
|
self.flush_thread = Thread(target=self.unflush, args=(), kwargs={})
|
||||||
self.flush_thread.start()
|
self.flush_thread.start()
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
|
||||||
def watch_node_memfree(data, stat, event=''):
|
def watch_node_memfree(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -185,7 +186,7 @@ class NodeInstance(object):
|
||||||
if data != self.memfree:
|
if data != self.memfree:
|
||||||
self.memfree = data
|
self.memfree = data
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name))
|
||||||
def watch_node_memused(data, stat, event=''):
|
def watch_node_memused(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -200,7 +201,7 @@ class NodeInstance(object):
|
||||||
if data != self.memused:
|
if data != self.memused:
|
||||||
self.memused = data
|
self.memused = data
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name))
|
||||||
def watch_node_memalloc(data, stat, event=''):
|
def watch_node_memalloc(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -215,7 +216,7 @@ class NodeInstance(object):
|
||||||
if data != self.memalloc:
|
if data != self.memalloc:
|
||||||
self.memalloc = data
|
self.memalloc = data
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name))
|
||||||
def watch_node_vcpualloc(data, stat, event=''):
|
def watch_node_vcpualloc(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -230,7 +231,7 @@ class NodeInstance(object):
|
||||||
if data != self.vcpualloc:
|
if data != self.vcpualloc:
|
||||||
self.vcpualloc = data
|
self.vcpualloc = data
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name))
|
||||||
def watch_node_runningdomains(data, stat, event=''):
|
def watch_node_runningdomains(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -245,7 +246,7 @@ class NodeInstance(object):
|
||||||
if data != self.domain_list:
|
if data != self.domain_list:
|
||||||
self.domain_list = data
|
self.domain_list = data
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
|
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
|
||||||
def watch_node_domainscount(data, stat, event=''):
|
def watch_node_domainscount(data, stat, event=''):
|
||||||
if event and event.type == 'DELETED':
|
if event and event.type == 'DELETED':
|
||||||
# The key has been deleted after existing before; terminate this watcher
|
# The key has been deleted after existing before; terminate this watcher
|
||||||
|
@ -323,26 +324,30 @@ class NodeInstance(object):
|
||||||
Acquire primary coordinator status from a peer node
|
Acquire primary coordinator status from a peer node
|
||||||
"""
|
"""
|
||||||
# Lock the primary node until transition is complete
|
# Lock the primary node until transition is complete
|
||||||
primary_lock = zkhandler.exclusivelock(self.zk_conn, '/config/primary_node')
|
primary_lock = self.zkhandler.exclusivelock('/config/primary_node')
|
||||||
primary_lock.acquire()
|
primary_lock.acquire()
|
||||||
|
|
||||||
# Ensure our lock key is populated
|
# Ensure our lock key is populated
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
|
|
||||||
# Synchronize nodes A (I am writer)
|
# Synchronize nodes A (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase A', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase A', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase A', state='o')
|
self.logger.out('Acquired write lock for synchronization phase A', state='o')
|
||||||
time.sleep(1) # Time fir reader to acquire the lock
|
time.sleep(1) # Time fir reader to acquire the lock
|
||||||
self.logger.out('Releasing write lock for synchronization phase A', state='i')
|
self.logger.out('Releasing write lock for synchronization phase A', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase A', state='o')
|
self.logger.out('Released write lock for synchronization phase A', state='o')
|
||||||
time.sleep(0.1) # Time fir new writer to acquire the lock
|
time.sleep(0.1) # Time fir new writer to acquire the lock
|
||||||
|
|
||||||
# Synchronize nodes B (I am reader)
|
# Synchronize nodes B (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase B', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase B', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired read lock for synchronization phase B', state='o')
|
self.logger.out('Acquired read lock for synchronization phase B', state='o')
|
||||||
|
@ -351,7 +356,7 @@ class NodeInstance(object):
|
||||||
self.logger.out('Released read lock for synchronization phase B', state='o')
|
self.logger.out('Released read lock for synchronization phase B', state='o')
|
||||||
|
|
||||||
# Synchronize nodes C (I am writer)
|
# Synchronize nodes C (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase C', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase C', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase C', state='o')
|
self.logger.out('Acquired write lock for synchronization phase C', state='o')
|
||||||
|
@ -367,12 +372,14 @@ class NodeInstance(object):
|
||||||
)
|
)
|
||||||
common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
|
common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
|
||||||
self.logger.out('Releasing write lock for synchronization phase C', state='i')
|
self.logger.out('Releasing write lock for synchronization phase C', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase C', state='o')
|
self.logger.out('Released write lock for synchronization phase C', state='o')
|
||||||
|
|
||||||
# Synchronize nodes D (I am writer)
|
# Synchronize nodes D (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase D', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase D', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase D', state='o')
|
self.logger.out('Acquired write lock for synchronization phase D', state='o')
|
||||||
|
@ -397,12 +404,14 @@ class NodeInstance(object):
|
||||||
)
|
)
|
||||||
common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
|
common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
|
||||||
self.logger.out('Releasing write lock for synchronization phase D', state='i')
|
self.logger.out('Releasing write lock for synchronization phase D', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase D', state='o')
|
self.logger.out('Released write lock for synchronization phase D', state='o')
|
||||||
|
|
||||||
# Synchronize nodes E (I am writer)
|
# Synchronize nodes E (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase E', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase E', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase E', state='o')
|
self.logger.out('Acquired write lock for synchronization phase E', state='o')
|
||||||
|
@ -418,12 +427,14 @@ class NodeInstance(object):
|
||||||
)
|
)
|
||||||
common.createIPAddress('169.254.169.254', '32', 'lo')
|
common.createIPAddress('169.254.169.254', '32', 'lo')
|
||||||
self.logger.out('Releasing write lock for synchronization phase E', state='i')
|
self.logger.out('Releasing write lock for synchronization phase E', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase E', state='o')
|
self.logger.out('Released write lock for synchronization phase E', state='o')
|
||||||
|
|
||||||
# Synchronize nodes F (I am writer)
|
# Synchronize nodes F (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase F', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase F', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase F', state='o')
|
self.logger.out('Acquired write lock for synchronization phase F', state='o')
|
||||||
|
@ -432,12 +443,14 @@ class NodeInstance(object):
|
||||||
for network in self.d_network:
|
for network in self.d_network:
|
||||||
self.d_network[network].createGateways()
|
self.d_network[network].createGateways()
|
||||||
self.logger.out('Releasing write lock for synchronization phase F', state='i')
|
self.logger.out('Releasing write lock for synchronization phase F', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase F', state='o')
|
self.logger.out('Released write lock for synchronization phase F', state='o')
|
||||||
|
|
||||||
# Synchronize nodes G (I am writer)
|
# Synchronize nodes G (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase G', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase G', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase G', state='o')
|
self.logger.out('Acquired write lock for synchronization phase G', state='o')
|
||||||
|
@ -504,14 +517,18 @@ class NodeInstance(object):
|
||||||
else:
|
else:
|
||||||
self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e')
|
self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e')
|
||||||
self.logger.out('Releasing write lock for synchronization phase G', state='i')
|
self.logger.out('Releasing write lock for synchronization phase G', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase G', state='o')
|
self.logger.out('Released write lock for synchronization phase G', state='o')
|
||||||
|
|
||||||
# Wait 2 seconds for everything to stabilize before we declare all-done
|
# Wait 2 seconds for everything to stabilize before we declare all-done
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
primary_lock.release()
|
primary_lock.release()
|
||||||
zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'primary'})
|
self.zkhandler.write([
|
||||||
|
('/nodes/{}/routerstate'.format(self.name), 'primary')
|
||||||
|
])
|
||||||
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
|
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
|
||||||
|
|
||||||
def become_secondary(self):
|
def become_secondary(self):
|
||||||
|
@ -521,7 +538,7 @@ class NodeInstance(object):
|
||||||
time.sleep(0.2) # Initial delay for the first writer to grab the lock
|
time.sleep(0.2) # Initial delay for the first writer to grab the lock
|
||||||
|
|
||||||
# Synchronize nodes A (I am reader)
|
# Synchronize nodes A (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase A', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase A', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired read lock for synchronization phase A', state='o')
|
self.logger.out('Acquired read lock for synchronization phase A', state='o')
|
||||||
|
@ -530,7 +547,7 @@ class NodeInstance(object):
|
||||||
self.logger.out('Released read lock for synchronization phase A', state='o')
|
self.logger.out('Released read lock for synchronization phase A', state='o')
|
||||||
|
|
||||||
# Synchronize nodes B (I am writer)
|
# Synchronize nodes B (I am writer)
|
||||||
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.writelock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring write lock for synchronization phase B', state='i')
|
self.logger.out('Acquiring write lock for synchronization phase B', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired write lock for synchronization phase B', state='o')
|
self.logger.out('Acquired write lock for synchronization phase B', state='o')
|
||||||
|
@ -541,7 +558,9 @@ class NodeInstance(object):
|
||||||
for network in self.d_network:
|
for network in self.d_network:
|
||||||
self.d_network[network].stopDHCPServer()
|
self.d_network[network].stopDHCPServer()
|
||||||
self.logger.out('Releasing write lock for synchronization phase B', state='i')
|
self.logger.out('Releasing write lock for synchronization phase B', state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
|
self.zkhandler.write([
|
||||||
|
('/locks/primary_node', '')
|
||||||
|
])
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization phase B', state='o')
|
self.logger.out('Released write lock for synchronization phase B', state='o')
|
||||||
# 3. Stop client API
|
# 3. Stop client API
|
||||||
|
@ -553,7 +572,7 @@ class NodeInstance(object):
|
||||||
time.sleep(0.1) # Time fir new writer to acquire the lock
|
time.sleep(0.1) # Time fir new writer to acquire the lock
|
||||||
|
|
||||||
# Synchronize nodes C (I am reader)
|
# Synchronize nodes C (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase C', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase C', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired read lock for synchronization phase C', state='o')
|
self.logger.out('Acquired read lock for synchronization phase C', state='o')
|
||||||
|
@ -572,7 +591,7 @@ class NodeInstance(object):
|
||||||
self.logger.out('Released read lock for synchronization phase C', state='o')
|
self.logger.out('Released read lock for synchronization phase C', state='o')
|
||||||
|
|
||||||
# Synchronize nodes D (I am reader)
|
# Synchronize nodes D (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase D', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase D', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired read lock for synchronization phase D', state='o')
|
self.logger.out('Acquired read lock for synchronization phase D', state='o')
|
||||||
|
@ -600,7 +619,7 @@ class NodeInstance(object):
|
||||||
self.logger.out('Released read lock for synchronization phase D', state='o')
|
self.logger.out('Released read lock for synchronization phase D', state='o')
|
||||||
|
|
||||||
# Synchronize nodes E (I am reader)
|
# Synchronize nodes E (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase E', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase E', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired read lock for synchronization phase E', state='o')
|
self.logger.out('Acquired read lock for synchronization phase E', state='o')
|
||||||
|
@ -619,7 +638,7 @@ class NodeInstance(object):
|
||||||
self.logger.out('Released read lock for synchronization phase E', state='o')
|
self.logger.out('Released read lock for synchronization phase E', state='o')
|
||||||
|
|
||||||
# Synchronize nodes F (I am reader)
|
# Synchronize nodes F (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase F', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase F', state='i')
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self.logger.out('Acquired read lock for synchronization phase F', state='o')
|
self.logger.out('Acquired read lock for synchronization phase F', state='o')
|
||||||
|
@ -631,7 +650,7 @@ class NodeInstance(object):
|
||||||
self.logger.out('Released read lock for synchronization phase F', state='o')
|
self.logger.out('Released read lock for synchronization phase F', state='o')
|
||||||
|
|
||||||
# Synchronize nodes G (I am reader)
|
# Synchronize nodes G (I am reader)
|
||||||
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
|
lock = self.zkhandler.readlock('/locks/primary_node')
|
||||||
self.logger.out('Acquiring read lock for synchronization phase G', state='i')
|
self.logger.out('Acquiring read lock for synchronization phase G', state='i')
|
||||||
try:
|
try:
|
||||||
lock.acquire(timeout=60) # Don't wait forever and completely block us
|
lock.acquire(timeout=60) # Don't wait forever and completely block us
|
||||||
|
@ -644,7 +663,9 @@ class NodeInstance(object):
|
||||||
|
|
||||||
# Wait 2 seconds for everything to stabilize before we declare all-done
|
# Wait 2 seconds for everything to stabilize before we declare all-done
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'secondary'})
|
self.zkhandler.write([
|
||||||
|
('/nodes/{}/routerstate'.format(self.name), 'secondary')
|
||||||
|
])
|
||||||
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
|
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
|
||||||
|
|
||||||
# Flush all VMs on the host
|
# Flush all VMs on the host
|
||||||
|
@ -664,38 +685,42 @@ class NodeInstance(object):
|
||||||
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
|
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
|
||||||
|
|
||||||
# Don't replace the previous node if the VM is already migrated
|
# Don't replace the previous node if the VM is already migrated
|
||||||
if zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)):
|
if self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)):
|
||||||
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
|
current_node = self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid))
|
||||||
else:
|
else:
|
||||||
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
|
current_node = self.zkhandler.read('/domains/{}/node'.format(dom_uuid))
|
||||||
|
|
||||||
target_node = common.findTargetNode(self.zk_conn, self.config, self.logger, dom_uuid)
|
target_node = common.findTargetNode(self.zkhandler, self.config, self.logger, dom_uuid)
|
||||||
if target_node == current_node:
|
if target_node == current_node:
|
||||||
target_node = None
|
target_node = None
|
||||||
|
|
||||||
if target_node is None:
|
if target_node is None:
|
||||||
self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e')
|
self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e')
|
||||||
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(dom_uuid): 'shutdown'})
|
self.zkhandler.write([
|
||||||
zkhandler.writedata(self.zk_conn, {'/domains/{}/node_autostart'.format(dom_uuid): 'True'})
|
('/domains/{}/state'.format(dom_uuid), 'shutdown')
|
||||||
|
('/domains/{}/node_autostart'.format(dom_uuid), 'True')
|
||||||
|
])
|
||||||
else:
|
else:
|
||||||
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
|
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {
|
self.zkhandler.write([
|
||||||
'/domains/{}/state'.format(dom_uuid): 'migrate',
|
('/domains/{}/state'.format(dom_uuid), 'migrate'),
|
||||||
'/domains/{}/node'.format(dom_uuid): target_node,
|
('/domains/{}/node'.format(dom_uuid), target_node),
|
||||||
'/domains/{}/lastnode'.format(dom_uuid): current_node
|
('/domains/{}/lastnode'.format(dom_uuid), current_node)
|
||||||
})
|
])
|
||||||
|
|
||||||
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
|
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
|
||||||
ticks = 0
|
ticks = 0
|
||||||
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
|
while self.zkhandler.read('/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
|
||||||
ticks += 1
|
ticks += 1
|
||||||
if ticks > 600:
|
if ticks > 600:
|
||||||
# Abort if we've waited for 120 seconds, the VM is messed and just continue
|
# Abort if we've waited for 120 seconds, the VM is messed and just continue
|
||||||
break
|
break
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
zkhandler.writedata(self.zk_conn, {'/nodes/{}/runningdomains'.format(self.name): ''})
|
self.zkhandler.write([
|
||||||
zkhandler.writedata(self.zk_conn, {'/nodes/{}/domainstate'.format(self.name): 'flushed'})
|
('/nodes/{}/runningdomains'.format(self.name), ''),
|
||||||
|
('/nodes/{}/domainstate'.format(self.name), '')
|
||||||
|
])
|
||||||
self.flush_thread = None
|
self.flush_thread = None
|
||||||
self.flush_stopper = False
|
self.flush_stopper = False
|
||||||
return
|
return
|
||||||
|
@ -712,20 +737,20 @@ class NodeInstance(object):
|
||||||
return
|
return
|
||||||
|
|
||||||
# Handle autostarts
|
# Handle autostarts
|
||||||
autostart = zkhandler.readdata(self.zk_conn, '/domains/{}/node_autostart'.format(dom_uuid))
|
autostart = self.zkhandler.read('/domains/{}/node_autostart'.format(dom_uuid))
|
||||||
node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
|
node = self.zkhandler.read('/domains/{}/node'.format(dom_uuid))
|
||||||
if autostart == 'True' and node == self.name:
|
if autostart == 'True' and node == self.name:
|
||||||
self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i')
|
self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {
|
self.zkhandler.write([
|
||||||
'/domains/{}/state'.format(dom_uuid): 'start',
|
('/domains/{}/state'.format(dom_uuid), 'start'),
|
||||||
'/domains/{}/node'.format(dom_uuid): self.name,
|
('/domains/{}/node'.format(dom_uuid), self.name),
|
||||||
'/domains/{}/lastnode'.format(dom_uuid): '',
|
('/domains/{}/lastnode'.format(dom_uuid), ''),
|
||||||
'/domains/{}/node_autostart'.format(dom_uuid): 'False'
|
('/domains/{}/node_autostart'.format(dom_uuid), 'False')
|
||||||
})
|
])
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
|
last_node = self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid))
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -733,17 +758,19 @@ class NodeInstance(object):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
|
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
|
||||||
zkhandler.writedata(self.zk_conn, {
|
self.zkhandler.write([
|
||||||
'/domains/{}/state'.format(dom_uuid): 'migrate',
|
('/domains/{}/state'.format(dom_uuid), 'migrate'),
|
||||||
'/domains/{}/node'.format(dom_uuid): self.name,
|
('/domains/{}/node'.format(dom_uuid), self.name),
|
||||||
'/domains/{}/lastnode'.format(dom_uuid): ''
|
('/domains/{}/lastnode'.format(dom_uuid), '')
|
||||||
})
|
])
|
||||||
|
|
||||||
# Wait for the VM to migrate back
|
# Wait for the VM to migrate back
|
||||||
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
|
while self.zkhandler.read('/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
zkhandler.writedata(self.zk_conn, {'/nodes/{}/domainstate'.format(self.name): 'ready'})
|
self.zkhandler.write([
|
||||||
|
('/nodes/{}/domainstate'.format(self.name), 'ready')
|
||||||
|
])
|
||||||
self.flush_thread = None
|
self.flush_thread = None
|
||||||
self.flush_stopper = False
|
self.flush_stopper = False
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue