Lock primary_node key during primary switchover
Also implements a looping to switch over the Patroni leader to ensure this always follows the primary and clean up the code around here a bit.
This commit is contained in:
parent
710d2cf9c2
commit
a329376d33
|
@ -115,10 +115,11 @@ class NodeInstance(object):
|
|||
# We're a coordinator so we care about networking
|
||||
if data != self.router_state:
|
||||
self.router_state = data
|
||||
if self.router_state == 'primary':
|
||||
self.become_primary()
|
||||
else:
|
||||
self.become_secondary()
|
||||
if self.config['enable_networking']:
|
||||
if self.router_state == 'primary':
|
||||
self.become_primary()
|
||||
else:
|
||||
self.become_secondary()
|
||||
|
||||
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
|
||||
def watch_node_domainstate(data, stat, event=''):
|
||||
|
@ -259,22 +260,24 @@ class NodeInstance(object):
|
|||
|
||||
# Routing primary/secondary states
|
||||
def become_secondary(self):
|
||||
if self.config['enable_networking']:
|
||||
self.logger.out('Setting router {} to secondary state'.format(self.name), state='i')
|
||||
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i')
|
||||
time.sleep(2)
|
||||
if self.config['enable_api']:
|
||||
self.logger.out('Stopping PVC API client service', state='i')
|
||||
common.run_os_command("systemctl stop pvc-api.service")
|
||||
for network in self.d_network:
|
||||
self.d_network[network].stopDHCPServer()
|
||||
self.d_network[network].removeGateways()
|
||||
self.removeFloatingAddresses()
|
||||
self.dns_aggregator.stop_aggregator()
|
||||
self.logger.out('Setting router {} to secondary state'.format(self.name), state='i')
|
||||
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i')
|
||||
time.sleep(2)
|
||||
if self.config['enable_api']:
|
||||
self.logger.out('Stopping PVC API client service', state='i')
|
||||
common.run_os_command("systemctl stop pvc-api.service")
|
||||
for network in self.d_network:
|
||||
self.d_network[network].stopDHCPServer()
|
||||
self.d_network[network].removeGateways()
|
||||
self.removeFloatingAddresses()
|
||||
self.dns_aggregator.stop_aggregator()
|
||||
|
||||
def become_primary(self):
|
||||
if self.config['enable_networking']:
|
||||
# Establish a lock
|
||||
with zkhandler.writelock(self.zk_conn, '/primary_node'):
|
||||
self.logger.out('Setting router {} to primary state'.format(self.name), state='i')
|
||||
|
||||
# Create floating addresses
|
||||
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i')
|
||||
self.createFloatingAddresses()
|
||||
# Start up the gateways and DHCP servers
|
||||
|
@ -285,25 +288,30 @@ class NodeInstance(object):
|
|||
self.logger.out('Starting PVC API client service', state='i')
|
||||
common.run_os_command("systemctl start pvc-api.service")
|
||||
time.sleep(1)
|
||||
# Force Patroni to switch to the local instance
|
||||
|
||||
# Switch Patroni leader to the local instance
|
||||
self.logger.out('Setting Patroni leader to this node', state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"""
|
||||
patronictl
|
||||
-c /etc/patroni/config.yml
|
||||
-d zookeeper://localhost:2181
|
||||
switchover
|
||||
--candidate {}
|
||||
--force
|
||||
pvcdns
|
||||
""".format(self.name)
|
||||
)
|
||||
if stdout:
|
||||
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
|
||||
else:
|
||||
self.logger.out('Failed to switch Patroni leader\n{}'.format(stderr), state='e')
|
||||
time.sleep(1)
|
||||
while True:
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"""
|
||||
patronictl
|
||||
-c /etc/patroni/config.yml
|
||||
-d zookeeper://localhost:2181
|
||||
switchover
|
||||
--candidate {}
|
||||
--force
|
||||
pvcdns
|
||||
""".format(self.name)
|
||||
)
|
||||
if stdout:
|
||||
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
|
||||
break
|
||||
else:
|
||||
self.logger.out('Failed to switch Patroni leader; retrying\n{}'.format(stderr), state='e')
|
||||
time.sleep(2)
|
||||
|
||||
# Start the DNS aggregator instance
|
||||
time.sleep(1)
|
||||
self.dns_aggregator.start_aggregator()
|
||||
|
||||
def createFloatingAddresses(self):
|
||||
|
|
Loading…
Reference in New Issue