Implemented coordinated locked node transitions

The previous method was a "throw it in the sea"-type migration with some
(very arbitrary) sleep statements thrown in for good measure.
Reimplement this with some hard locking. During each phase of the
transition, the nodes acquire read/write shared locks to a Zookeeper key
so that they can tightly coordinate the actions of transferring each
part of the primary state between them. This is done in a subthread to
prevent strange blocking issues that were encountered, likely due to
business in the existing main thread.
This commit is contained in:
Joshua Boniface 2019-12-19 10:45:24 -05:00
parent 0841ddf8b0
commit 8c252aeecc
1 changed files with 321 additions and 125 deletions

View File

@ -118,9 +118,21 @@ class NodeInstance(object):
self.router_state = data self.router_state = data
if self.config['enable_networking']: if self.config['enable_networking']:
if self.router_state == 'primary': if self.router_state == 'primary':
self.become_primary() # Skip becoming primary unless already running
if self.daemon_state == 'run':
self.logger.out('Setting node {} to primary state'.format(self.name), state='i')
#self.become_primary()
transition_thread = threading.Thread(target=self.become_primary, args=(), kwargs={})
transition_thread.start()
#transition_thread.join()
else: else:
self.become_secondary() # Skip becoming secondary unless already running
if self.daemon_state == 'run':
self.logger.out('Setting node {} to secondary state'.format(self.name), state='i')
#self.become_secondary()
transition_thread = threading.Thread(target=self.become_secondary, args=(), kwargs={})
transition_thread.start()
#transition_thread.join()
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name)) @self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
def watch_node_domainstate(data, stat, event=''): def watch_node_domainstate(data, stat, event=''):
@ -259,41 +271,166 @@ class NodeInstance(object):
def update_domain_list(self, d_domain): def update_domain_list(self, d_domain):
self.d_domain = d_domain self.d_domain = d_domain
# Routing primary/secondary states ######
def become_secondary(self): # Phases of node transition
if self.daemon_state == 'init': #
return # Current Primary Candidate Secondary
# -> secondary -> primary
self.logger.out('Setting router {} to secondary state'.format(self.name), state='i') #
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i') # def become_secondary() def become_primary()
#
time.sleep(1) # A ----------------------------------------------------------------- SYNC (candidate)
# B ----------------------------------------------------------------- SYNC (current)
if self.config['enable_api']: # 1. Stop client API ||
self.logger.out('Stopping PVC API client service', state='i') # 2. Stop metadata API ||
common.run_os_command("systemctl stop pvc-api.service") # 3. Stop DNS aggregator ||
for network in self.d_network: # 4. Stop DHCP servers ||
self.d_network[network].stopDHCPServer() # 4a) network 1 ||
self.d_network[network].removeGateways() # 4b) network 2 ||
self.dns_aggregator.stop_aggregator() # etc. ||
self.metadata_api.stop() # --
self.removeFloatingAddresses() # C ----------------------------------------------------------------- SYNC (candidate)
# 5. Remove upstream floating IP 1. Add upstream floating IP ||
# --
# D ----------------------------------------------------------------- SYNC (candidate)
# 6. Remove cluster floating IP 2. Add cluster floating IP ||
# --
# E ----------------------------------------------------------------- SYNC (candidate)
# 7. Remove metadata floating IP 3. Add metadata floating IP ||
# --
# F ----------------------------------------------------------------- SYNC (candidate)
# 8. Remove gateway IPs 4. Add gateway IPs ||
# 8a) network 1 4a) network 1 ||
# 8b) network 2 4b) network 2 ||
# etc. etc. ||
# --
# G ----------------------------------------------------------------- SYNC (candidate)
# 5. Transition Patroni primary ||
# 6. Start DHCP servers ||
# 5a) network 1 ||
# 5b) network 2 ||
# etc. ||
# 7. Start DNS aggregator ||
# 8. Start metadata API ||
# 9. Start client API ||
# --
######
def become_primary(self): def become_primary(self):
# Establish a lock """
with zkhandler.writelock(self.zk_conn, '/primary_node'): Acquire primary coordinator status from a peer node
self.logger.out('Setting router {} to primary state'.format(self.name), state='i') """
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i') # Lock the primary node until transition is complete
primary_lock = zkhandler.writelock(self.zk_conn, '/primary_node')
primary_lock.acquire()
# Create floating addresses # Ensure our lock key is populated
self.createFloatingAddresses() zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
# Start up the gateways and DHCP servers
# Synchronize nodes A (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization A', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization A', state='o')
time.sleep(1) # Time for reader to acquire the lock
self.logger.out('Releasing write lock for synchronization A', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization A', state='o')
time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes B (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization B', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization B', state='o')
self.logger.out('Releasing read lock for synchronization B', state='i')
lock.release()
self.logger.out('Released read lock for synchronization B', state='o')
# Synchronize nodes C (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization C', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization C', state='o')
time.sleep(0.5) # Time for reader to acquire the lock
# 1. Add Upstream floating IP
self.logger.out(
'Creating floating upstream IP {}/{} on interface {}'.format(
self.upstream_ipaddr,
self.upstream_cidrnetmask,
self.upstream_dev
),
state='o'
)
common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev)
self.logger.out('Releasing write lock for synchronization C', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization C', state='o')
# Synchronize nodes D (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization D', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization D', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 2. Add Cluster floating IP
self.logger.out(
'Creating floating management IP {}/{} on interface {}'.format(
self.vni_ipaddr,
self.vni_cidrnetmask,
'brcluster'
),
state='o'
)
common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
self.logger.out('Releasing write lock for synchronization D', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization D', state='o')
# Synchronize nodes E (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization E', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization E', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 3. Add Metadata link-local IP
self.logger.out(
'Creating Metadata link-local IP {}/{} on interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.createIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing write lock for synchronization E', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization E', state='o')
# Synchronize nodes F (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization F', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization F', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 4. Add gateway IPs
for network in self.d_network: for network in self.d_network:
self.d_network[network].createGateways() self.d_network[network].createGateways()
self.d_network[network].startDHCPServer() self.logger.out('Releasing write lock for synchronization F', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
time.sleep(1) lock.release()
self.logger.out('Released write lock for synchronization F', state='o')
# Synchronize nodes G (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization G', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization G', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 5. Transition Patroni primary
self.logger.out('Setting Patroni leader to this node', state='i') self.logger.out('Setting Patroni leader to this node', state='i')
tick = 1 tick = 1
# As long as we're primary, keep trying to set the Patroni leader to us # As long as we're primary, keep trying to set the Patroni leader to us
@ -323,88 +460,83 @@ class NodeInstance(object):
break break
# Handle a failed switchover # Handle a failed switchover
elif stdout and (stdout.split('\n')[-1].split()[:2] == ["Switchover", "failed,"] or stdout.strip().split('\n')[-1].split()[:1] == ["Error"]): elif stdout and (stdout.split('\n')[-1].split()[:2] == ["Switchover", "failed,"] or stdout.strip().split('\n')[-1].split()[:1] == ["Error"]):
self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}\n'.format(tick, stdout, state='e')) if tick > 4:
tick += 1
if tick > 5:
self.logger.out('Failed to switch Patroni leader after 5 tries; aborting', state='e') self.logger.out('Failed to switch Patroni leader after 5 tries; aborting', state='e')
break break
else: else:
self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}\n'.format(tick, stdout), state='e')
tick += 1
time.sleep(5) time.sleep(5)
# Otherwise, we succeeded # Otherwise, we succeeded
else: else:
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o') self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
time.sleep(0.2)
break break
# 6. Start DHCP servers
# Start the DNS aggregator instance for network in self.d_network:
time.sleep(1) self.d_network[network].startDHCPServer()
# 7. Start DNS aggregator
self.dns_aggregator.start_aggregator() self.dns_aggregator.start_aggregator()
# 8. Start metadata API
self.metadata_api.start() self.metadata_api.start()
# 9. Start client API (and provisioner worker)
# Start the clients
if self.config['enable_api']: if self.config['enable_api']:
self.logger.out('Starting PVC API client service', state='i') self.logger.out('Stopping PVC API client service', state='i')
common.run_os_command("systemctl start pvc-api.service") common.run_os_command("systemctl start pvc-api.service")
self.logger.out('Starting PVC Provisioner Worker service', state='i') self.logger.out('Starting PVC Provisioner Worker service', state='i')
common.run_os_command("systemctl start pvc-provisioner-worker.service") common.run_os_command("systemctl start pvc-provisioner-worker.service")
self.logger.out('Releasing write lock for synchronization G', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization G', state='o')
def createFloatingAddresses(self): primary_lock.release()
# Metadata link-local IP self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
self.logger.out(
'Creating Metadata link-local IP {}/{} on interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.createIPAddress('169.254.169.254', '32', 'lo')
# VNI floating IP def become_secondary(self):
self.logger.out( """
'Creating floating management IP {}/{} on interface {}'.format( Relinquish primary coordinator status to a peer node
self.vni_ipaddr, """
self.vni_cidrnetmask, time.sleep(0.2) # Initial delay for the first writer to grab the lock
'brcluster'
),
state='o'
)
common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
# Upstream floating IP # Synchronize nodes A (I am reader)
self.logger.out( lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
'Creating floating upstream IP {}/{} on interface {}'.format( self.logger.out('Acquiring read lock for synchronization A', state='i')
self.upstream_ipaddr, lock.acquire()
self.upstream_cidrnetmask, self.logger.out('Acquired read lock for synchronization A', state='o')
self.upstream_dev self.logger.out('Releasing read lock for synchronization A', state='i')
), lock.release()
state='o' self.logger.out('Released read lock for synchronization A', state='o')
)
common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev)
def removeFloatingAddresses(self): # Synchronize nodes B (I am writer)
# Metadata link-local IP lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out( self.logger.out('Acquiring write lock for synchronization B', state='i')
'Removing Metadata link-local IP {}/{} from interface {}'.format( lock.acquire()
'169.254.169.254', self.logger.out('Acquired write lock for synchronization B', state='o')
'32', time.sleep(0.2) # Time for reader to acquire the lock
'lo' # 1. Stop client API
), if self.config['enable_api']:
state='o' self.logger.out('Stopping PVC API client service', state='i')
) common.run_os_command("systemctl stop pvc-api.service")
common.removeIPAddress('169.254.169.254', '32', 'lo') # 2. Stop metadata API
self.metadata_api.stop()
# 3. Stop DNS aggregator
self.dns_aggregator.stop_aggregator()
# 4. Stop DHCP servers
for network in self.d_network:
self.d_network[network].stopDHCPServer()
self.logger.out('Releasing write lock for synchronization B', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization B', state='o')
time.sleep(0.1) # Time for new writer to acquire the lock
# VNI floating IP # Synchronize nodes C (I am reader)
self.logger.out( lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
'Removing floating management IP {}/{} from interface {}'.format( self.logger.out('Acquiring read lock for synchronization C', state='i')
self.vni_ipaddr, lock.acquire()
self.vni_cidrnetmask, self.logger.out('Acquired read lock for synchronization C', state='o')
'brcluster' # 5. Remove Upstream floating IP
),
state='o'
)
common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
# Upstream floating IP
self.logger.out( self.logger.out(
'Removing floating upstream IP {}/{} from interface {}'.format( 'Removing floating upstream IP {}/{} from interface {}'.format(
self.upstream_ipaddr, self.upstream_ipaddr,
@ -414,6 +546,70 @@ class NodeInstance(object):
state='o' state='o'
) )
common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev) common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev)
self.logger.out('Releasing read lock for synchronization C', state='i')
lock.release()
self.logger.out('Released read lock for synchronization C', state='o')
# Synchronize nodes D (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization D', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization D', state='o')
# 6. Remove Cluster floating IP
self.logger.out(
'Removing floating management IP {}/{} from interface {}'.format(
self.vni_ipaddr,
self.vni_cidrnetmask,
'brcluster'
),
state='o'
)
common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
self.logger.out('Releasing read lock for synchronization D', state='i')
lock.release()
self.logger.out('Released read lock for synchronization D', state='o')
# Synchronize nodes E (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization E', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization E', state='o')
# 7. Remove Metadata link-local IP
self.logger.out(
'Removing Metadata link-local IP {}/{} from interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.removeIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing read lock for synchronization E', state='i')
lock.release()
self.logger.out('Released read lock for synchronization E', state='o')
# Synchronize nodes F (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization F', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization F', state='o')
# 8. Remove gateway IPs
for network in self.d_network:
self.d_network[network].removeGateways()
self.logger.out('Releasing read lock for synchronization F', state='i')
lock.release()
self.logger.out('Released read lock for synchronization F', state='o')
# Synchronize nodes G (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization G', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization G', state='o')
self.logger.out('Releasing read lock for synchronization G', state='i')
lock.release()
self.logger.out('Released read lock for synchronization G', state='o')
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
# Flush all VMs on the host # Flush all VMs on the host
def flush(self): def flush(self):