Safely reset RBD locks on failed VMs

Should correct issues on cold start as well as if a VM crashes
uncleanly, which would prevent the VM from starting due to stale RBD
locks.

This implementation has four parts:
  1. Update how IP addresses are handled, specifically by replacing all
  previous instances of "vni_ipaddr" with "vni_floatingipaddr", and then
  adding the "vni_ipaddr" with the real data for this node's IPs. Also
  include the storage IPs in this where they weren't before, so each
  this_node actually has the local IPs plus floating IPs. This enables
  the next two steps.
  2. Modify flush_locks to take this_node as an argument, and update the
  run_command function to only operate against this node, rather than on
  the primary coordinator.
  3. Have the flush_locks check each lock against the current node, to
  verify that the lock is actually held by the current node. This is the
  only way to do this safely. During fencing, we override this by not
  passing a this_node which bypasses this check.
  4. Have the VM start do the check for VM failure/startup and execute a
  flush_locks before actually starting the VM.
This commit is contained in:
Joshua Boniface 2020-12-14 14:39:51 -05:00
parent 68d87c0b99
commit 7c99a7bda7
4 changed files with 66 additions and 27 deletions

View File

@ -76,8 +76,8 @@ class PowerDNSInstance(object):
self.dns_server_daemon = None self.dns_server_daemon = None
# Floating upstreams # Floating upstreams
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/') self.vni_floatingipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/')
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/') self.upstream_floatingipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
def start(self): def start(self):
self.logger.out( self.logger.out(
@ -93,7 +93,7 @@ class PowerDNSInstance(object):
'--disable-syslog=yes', # Log only to stdout (which is then captured) '--disable-syslog=yes', # Log only to stdout (which is then captured)
'--disable-axfr=no', # Allow AXFRs '--disable-axfr=no', # Allow AXFRs
'--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere '--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere
'--local-address={},{}'.format(self.vni_ipaddr, self.upstream_ipaddr), # Listen on floating IPs '--local-address={},{}'.format(self.vni_floatingipaddr, self.upstream_floatingipaddr), # Listen on floating IPs
'--local-port=53', # On port 53 '--local-port=53', # On port 53
'--log-dns-details=on', # Log details '--log-dns-details=on', # Log details
'--loglevel=3', # Log info '--loglevel=3', # Log info

View File

@ -1356,9 +1356,11 @@ def collect_vm_stats(queue):
if instance.getdom() is not None: if instance.getdom() is not None:
try: try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
logger.out("VM {} has failed".format(instance.domname), state='w', prefix='vm-thread')
raise raise
except Exception: except Exception:
# Toggle a state "change" # Toggle a state "change"
logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread')
zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(domain): instance.getstate()}) zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(domain): instance.getstate()})
elif instance.getnode() == this_node.name: elif instance.getnode() == this_node.name:
memprov += instance.getmemory() memprov += instance.getmemory()

View File

@ -64,17 +64,28 @@ class NodeInstance(object):
self.vcpualloc = 0 self.vcpualloc = 0
# Floating IP configurations # Floating IP configurations
if self.config['enable_networking']: if self.config['enable_networking']:
self.vni_dev = self.config['vni_dev']
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/')
self.upstream_dev = self.config['upstream_dev'] self.upstream_dev = self.config['upstream_dev']
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/') self.upstream_floatingipaddr = self.config['upstream_floating_ip'].split('/')[0]
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_dev_ip'].split('/')
self.vni_dev = self.config['vni_dev']
self.vni_floatingipaddr = self.config['vni_floating_ip'].split('/')[0]
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_dev_ip'].split('/')
self.storage_dev = self.config['storage_dev']
self.storage_floatingipaddr = self.config['storage_floating_ip'].split('/')[0]
self.storage_ipaddr, self.storage_cidrnetmask = self.config['storage_dev_ip'].split('/')
else: else:
self.vni_dev = None
self.vni_ipaddr = None
self.vni_cidrnetmask = None
self.upstream_dev = None self.upstream_dev = None
self.upstream_floatingipaddr = None
self.upstream_ipaddr = None self.upstream_ipaddr = None
self.upstream_cidrnetmask = None self.upstream_cidrnetmask = None
self.vni_dev = None
self.vni_floatingipaddr = None
self.vni_ipaddr = None
self.vni_cidrnetmask = None
self.storage_dev = None
self.storage_floatingipaddr = None
self.storage_ipaddr = None
self.storage_cidrnetmask = None
# Threads # Threads
self.flush_thread = None self.flush_thread = None
# Flags # Flags
@ -349,13 +360,13 @@ class NodeInstance(object):
# 1. Add Upstream floating IP # 1. Add Upstream floating IP
self.logger.out( self.logger.out(
'Creating floating upstream IP {}/{} on interface {}'.format( 'Creating floating upstream IP {}/{} on interface {}'.format(
self.upstream_ipaddr, self.upstream_floatingipaddr,
self.upstream_cidrnetmask, self.upstream_cidrnetmask,
'brupstream' 'brupstream'
), ),
state='o' state='o'
) )
common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, 'brupstream') common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing write lock for synchronization phase C', state='i') self.logger.out('Releasing write lock for synchronization phase C', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release() lock.release()
@ -367,16 +378,25 @@ class NodeInstance(object):
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o') self.logger.out('Acquired write lock for synchronization phase D', state='o')
time.sleep(0.2) # Time fir reader to acquire the lock time.sleep(0.2) # Time fir reader to acquire the lock
# 2. Add Cluster floating IP # 2. Add Cluster & Storage floating IP
self.logger.out( self.logger.out(
'Creating floating management IP {}/{} on interface {}'.format( 'Creating floating management IP {}/{} on interface {}'.format(
self.vni_ipaddr, self.vni_floatingipaddr,
self.vni_cidrnetmask, self.vni_cidrnetmask,
'brcluster' 'brcluster'
), ),
state='o' state='o'
) )
common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') common.createIPAddress(self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster')
self.logger.out(
'Creating floating management IP {}/{} on interface {}'.format(
self.storage_floatingipaddr,
self.storage_cidrnetmask,
'brcluster'
),
state='o'
)
common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
self.logger.out('Releasing write lock for synchronization phase D', state='i') self.logger.out('Releasing write lock for synchronization phase D', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release() lock.release()
@ -541,13 +561,13 @@ class NodeInstance(object):
# 5. Remove Upstream floating IP # 5. Remove Upstream floating IP
self.logger.out( self.logger.out(
'Removing floating upstream IP {}/{} from interface {}'.format( 'Removing floating upstream IP {}/{} from interface {}'.format(
self.upstream_ipaddr, self.upstream_floatingipaddr,
self.upstream_cidrnetmask, self.upstream_cidrnetmask,
'brupstream' 'brupstream'
), ),
state='o' state='o'
) )
common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, 'brupstream') common.removeIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing read lock for synchronization phase C', state='i') self.logger.out('Releasing read lock for synchronization phase C', state='i')
lock.release() lock.release()
self.logger.out('Released read lock for synchronization phase C', state='o') self.logger.out('Released read lock for synchronization phase C', state='o')
@ -557,16 +577,25 @@ class NodeInstance(object):
self.logger.out('Acquiring read lock for synchronization phase D', state='i') self.logger.out('Acquiring read lock for synchronization phase D', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o') self.logger.out('Acquired read lock for synchronization phase D', state='o')
# 6. Remove Cluster floating IP # 6. Remove Cluster & Storage floating IP
self.logger.out( self.logger.out(
'Removing floating management IP {}/{} from interface {}'.format( 'Removing floating management IP {}/{} from interface {}'.format(
self.vni_ipaddr, self.vni_floatingipaddr,
self.vni_cidrnetmask, self.vni_cidrnetmask,
'brcluster' 'brcluster'
), ),
state='o' state='o'
) )
common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') common.removeIPAddress(self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster')
self.logger.out(
'Removing floating management IP {}/{} from interface {}'.format(
self.storage_floatingipaddr,
self.storage_cidrnetmask,
'brcluster'
),
state='o'
)
common.removeIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
self.logger.out('Releasing read lock for synchronization phase D', state='i') self.logger.out('Releasing read lock for synchronization phase D', state='i')
lock.release() lock.release()
self.logger.out('Released read lock for synchronization phase D', state='o') self.logger.out('Released read lock for synchronization phase D', state='o')

View File

@ -35,7 +35,7 @@ import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import daemon_lib.common as daemon_common import daemon_lib.common as daemon_common
def flush_locks(zk_conn, logger, dom_uuid): def flush_locks(zk_conn, logger, dom_uuid, this_node=None):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i') logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images # Get the list of RBD images
rbd_list = zkhandler.readdata(zk_conn, '/domains/{}/rbdlist'.format(dom_uuid)).split(',') rbd_list = zkhandler.readdata(zk_conn, '/domains/{}/rbdlist'.format(dom_uuid)).split(',')
@ -56,10 +56,13 @@ def flush_locks(zk_conn, logger, dom_uuid):
if lock_list: if lock_list:
# Loop through the locks # Loop through the locks
for lock in lock_list: for lock in lock_list:
if this_node is not None and lock['address'].split(':')[0] != this_node.storage_ipaddr:
logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e'))
continue
# Free the lock # Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker'])) lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker']))
if lock_remove_retcode != 0: if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}"\n{}'.format(lock['id'], rbd, lock_remove_stderr), state='e') logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e')
continue continue
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o') logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o')
@ -74,15 +77,14 @@ def run_command(zk_conn, logger, this_node, data):
# Flushing VM RBD locks # Flushing VM RBD locks
if command == 'flush_locks': if command == 'flush_locks':
dom_uuid = args dom_uuid = args
# If this node is taking over primary state, wait until it's done
while this_node.router_state == 'takeover': # Verify that the VM is set to run on this node
time.sleep(1) if this_node.d_domain[dom_uuid].getnode() == this_node.name:
if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains') zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains')
with zk_lock: with zk_lock:
# Add the OSD # Flush the lock
result = flush_locks(zk_conn, logger, dom_uuid) result = flush_locks(zk_conn, logger, dom_uuid, this_node)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
@ -225,6 +227,12 @@ class VMInstance(object):
except Exception: except Exception:
curstate = 'notstart' curstate = 'notstart'
# Handle situations where the VM crashed or the node unexpectedly rebooted
if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
# Flush locks
self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid))
flush_locks(self.zk_conn, self.logger, self.domuuid, self.this_node)
if curstate == libvirt.VIR_DOMAIN_RUNNING: if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model # If it is running just update the model
self.addDomainToList() self.addDomainToList()