Use transitional takeover states for migration
Use a pair of transitional states, "takeover" and "relinquish", when transitioning between primary and secondary coordinator states. This provides a clsuter-wide record that the nodes are still working during their synchronous transition states, and should allow clients to determine when the node(s) have fully switched over. Also add an additional 2 seconds of wait at the end of the transition jobs to ensure everything has had a chance to start before proceeding. References #72
This commit is contained in:
parent
8678dedfea
commit
d2a5fe59c0
|
@ -575,14 +575,17 @@ def cleanup():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Force into secondary network state if needed
|
# Force into secondary network state if needed
|
||||||
if zkhandler.readdata(zk_conn, '/nodes/{}/routerstate'.format(myhostname)) == 'primary':
|
try:
|
||||||
is_primary = True
|
if this_node.router_state == 'primary':
|
||||||
zkhandler.writedata(zk_conn, {
|
is_primary = True
|
||||||
'/nodes/{}/routerstate'.format(myhostname): 'secondary',
|
zkhandler.writedata(zk_conn, {
|
||||||
'/primary_node': 'none'
|
'/primary_node': 'none'
|
||||||
})
|
})
|
||||||
logger.out('Waiting 5 seconds for primary migration', state='s')
|
logger.out('Waiting for primary migration', state='s')
|
||||||
time.sleep(5)
|
while this_node.router_state != 'secondary':
|
||||||
|
time.sleep(1)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
# Set stop state in Zookeeper
|
# Set stop state in Zookeeper
|
||||||
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' })
|
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' })
|
||||||
|
@ -825,9 +828,10 @@ def update_primary(new_primary, stat, event=''):
|
||||||
logger.out('Contending for primary coordinator state', state='i')
|
logger.out('Contending for primary coordinator state', state='i')
|
||||||
zkhandler.writedata(zk_conn, {'/primary_node': myhostname})
|
zkhandler.writedata(zk_conn, {'/primary_node': myhostname})
|
||||||
elif new_primary == myhostname:
|
elif new_primary == myhostname:
|
||||||
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'primary'})
|
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'takeover'})
|
||||||
else:
|
else:
|
||||||
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'secondary'})
|
if this_node.router_state != 'secondary':
|
||||||
|
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'relinquish'})
|
||||||
else:
|
else:
|
||||||
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'client'})
|
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'client'})
|
||||||
|
|
||||||
|
|
|
@ -117,16 +117,19 @@ class NodeInstance(object):
|
||||||
if data != self.router_state:
|
if data != self.router_state:
|
||||||
self.router_state = data
|
self.router_state = data
|
||||||
if self.config['enable_networking']:
|
if self.config['enable_networking']:
|
||||||
if self.router_state == 'primary':
|
if self.router_state == 'takeover':
|
||||||
self.logger.out('Setting node {} to primary state'.format(self.name), state='i')
|
self.logger.out('Setting node {} to primary state'.format(self.name), state='i')
|
||||||
transition_thread = threading.Thread(target=self.become_primary, args=(), kwargs={})
|
transition_thread = threading.Thread(target=self.become_primary, args=(), kwargs={})
|
||||||
transition_thread.start()
|
transition_thread.start()
|
||||||
else:
|
if self.router_state == 'relinquish':
|
||||||
# Skip becoming secondary unless already running
|
# Skip becoming secondary unless already running
|
||||||
if self.daemon_state == 'run' or self.daemon_state == 'shutdown':
|
if self.daemon_state == 'run' or self.daemon_state == 'shutdown':
|
||||||
self.logger.out('Setting node {} to secondary state'.format(self.name), state='i')
|
self.logger.out('Setting node {} to secondary state'.format(self.name), state='i')
|
||||||
transition_thread = threading.Thread(target=self.become_secondary, args=(), kwargs={})
|
transition_thread = threading.Thread(target=self.become_secondary, args=(), kwargs={})
|
||||||
transition_thread.start()
|
transition_thread.start()
|
||||||
|
else:
|
||||||
|
# We did nothing, so just become secondary state
|
||||||
|
zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'secondary'})
|
||||||
|
|
||||||
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
|
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
|
||||||
def watch_node_domainstate(data, stat, event=''):
|
def watch_node_domainstate(data, stat, event=''):
|
||||||
|
@ -428,8 +431,8 @@ class NodeInstance(object):
|
||||||
self.logger.out('Setting Patroni leader to this node', state='i')
|
self.logger.out('Setting Patroni leader to this node', state='i')
|
||||||
tick = 1
|
tick = 1
|
||||||
patroni_failed = True
|
patroni_failed = True
|
||||||
# As long as we're primary, keep trying to set the Patroni leader to us
|
# As long as we're in takeover, keep trying to set the Patroni leader to us
|
||||||
while self.router_state == 'primary':
|
while self.router_state == 'takeover':
|
||||||
# Switch Patroni leader to the local instance
|
# Switch Patroni leader to the local instance
|
||||||
retcode, stdout, stderr = common.run_os_command(
|
retcode, stdout, stderr = common.run_os_command(
|
||||||
"""
|
"""
|
||||||
|
@ -489,7 +492,10 @@ class NodeInstance(object):
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released write lock for synchronization G', state='o')
|
self.logger.out('Released write lock for synchronization G', state='o')
|
||||||
|
|
||||||
|
# Wait 2 seconds for everything to stabilize before we declare all-done
|
||||||
|
time.sleep(2)
|
||||||
primary_lock.release()
|
primary_lock.release()
|
||||||
|
zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'primary'})
|
||||||
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
|
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
|
||||||
|
|
||||||
def become_secondary(self):
|
def become_secondary(self):
|
||||||
|
@ -611,6 +617,9 @@ class NodeInstance(object):
|
||||||
lock.release()
|
lock.release()
|
||||||
self.logger.out('Released read lock for synchronization G', state='o')
|
self.logger.out('Released read lock for synchronization G', state='o')
|
||||||
|
|
||||||
|
# Wait 2 seconds for everything to stabilize before we declare all-done
|
||||||
|
time.sleep(2)
|
||||||
|
zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'secondary'})
|
||||||
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
|
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
|
||||||
|
|
||||||
# Flush all VMs on the host
|
# Flush all VMs on the host
|
||||||
|
|
Loading…
Reference in New Issue