diff --git a/node-daemon/pvcnoded/DNSAggregatorInstance.py b/node-daemon/pvcnoded/DNSAggregatorInstance.py index d7dede44..46488512 100644 --- a/node-daemon/pvcnoded/DNSAggregatorInstance.py +++ b/node-daemon/pvcnoded/DNSAggregatorInstance.py @@ -76,8 +76,8 @@ class PowerDNSInstance(object): self.dns_server_daemon = None # Floating upstreams - self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/') - self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/') + self.vni_floatingipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/') + self.upstream_floatingipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/') def start(self): self.logger.out( @@ -93,7 +93,7 @@ class PowerDNSInstance(object): '--disable-syslog=yes', # Log only to stdout (which is then captured) '--disable-axfr=no', # Allow AXFRs '--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere - '--local-address={},{}'.format(self.vni_ipaddr, self.upstream_ipaddr), # Listen on floating IPs + '--local-address={},{}'.format(self.vni_floatingipaddr, self.upstream_floatingipaddr), # Listen on floating IPs '--local-port=53', # On port 53 '--log-dns-details=on', # Log details '--loglevel=3', # Log info diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 5cc9fc8f..b5ebadd0 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -1356,9 +1356,11 @@ def collect_vm_stats(queue): if instance.getdom() is not None: try: if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: + logger.out("VM {} has failed".format(instance.domname), state='w', prefix='vm-thread') raise except Exception: # Toggle a state "change" + logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread') zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(domain): instance.getstate()}) elif instance.getnode() == this_node.name: memprov += instance.getmemory() diff --git a/node-daemon/pvcnoded/NodeInstance.py b/node-daemon/pvcnoded/NodeInstance.py index 882e7785..3ccf6f35 100644 --- a/node-daemon/pvcnoded/NodeInstance.py +++ b/node-daemon/pvcnoded/NodeInstance.py @@ -64,17 +64,28 @@ class NodeInstance(object): self.vcpualloc = 0 # Floating IP configurations if self.config['enable_networking']: - self.vni_dev = self.config['vni_dev'] - self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/') self.upstream_dev = self.config['upstream_dev'] - self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/') + self.upstream_floatingipaddr = self.config['upstream_floating_ip'].split('/')[0] + self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_dev_ip'].split('/') + self.vni_dev = self.config['vni_dev'] + self.vni_floatingipaddr = self.config['vni_floating_ip'].split('/')[0] + self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_dev_ip'].split('/') + self.storage_dev = self.config['storage_dev'] + self.storage_floatingipaddr = self.config['storage_floating_ip'].split('/')[0] + self.storage_ipaddr, self.storage_cidrnetmask = self.config['storage_dev_ip'].split('/') else: - self.vni_dev = None - self.vni_ipaddr = None - self.vni_cidrnetmask = None self.upstream_dev = None + self.upstream_floatingipaddr = None self.upstream_ipaddr = None self.upstream_cidrnetmask = None + self.vni_dev = None + self.vni_floatingipaddr = None + self.vni_ipaddr = None + self.vni_cidrnetmask = None + self.storage_dev = None + self.storage_floatingipaddr = None + self.storage_ipaddr = None + self.storage_cidrnetmask = None # Threads self.flush_thread = None # Flags @@ -349,13 +360,13 @@ class NodeInstance(object): # 1. Add Upstream floating IP self.logger.out( 'Creating floating upstream IP {}/{} on interface {}'.format( - self.upstream_ipaddr, + self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream' ), state='o' ) - common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, 'brupstream') + common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream') self.logger.out('Releasing write lock for synchronization phase C', state='i') zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) lock.release() @@ -367,16 +378,25 @@ class NodeInstance(object): lock.acquire() self.logger.out('Acquired write lock for synchronization phase D', state='o') time.sleep(0.2) # Time fir reader to acquire the lock - # 2. Add Cluster floating IP + # 2. Add Cluster & Storage floating IP self.logger.out( 'Creating floating management IP {}/{} on interface {}'.format( - self.vni_ipaddr, + self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster' ), state='o' ) - common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') + common.createIPAddress(self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster') + self.logger.out( + 'Creating floating management IP {}/{} on interface {}'.format( + self.storage_floatingipaddr, + self.storage_cidrnetmask, + 'brcluster' + ), + state='o' + ) + common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage') self.logger.out('Releasing write lock for synchronization phase D', state='i') zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) lock.release() @@ -541,13 +561,13 @@ class NodeInstance(object): # 5. Remove Upstream floating IP self.logger.out( 'Removing floating upstream IP {}/{} from interface {}'.format( - self.upstream_ipaddr, + self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream' ), state='o' ) - common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, 'brupstream') + common.removeIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream') self.logger.out('Releasing read lock for synchronization phase C', state='i') lock.release() self.logger.out('Released read lock for synchronization phase C', state='o') @@ -557,16 +577,25 @@ class NodeInstance(object): self.logger.out('Acquiring read lock for synchronization phase D', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase D', state='o') - # 6. Remove Cluster floating IP + # 6. Remove Cluster & Storage floating IP self.logger.out( 'Removing floating management IP {}/{} from interface {}'.format( - self.vni_ipaddr, + self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster' ), state='o' ) - common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') + common.removeIPAddress(self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster') + self.logger.out( + 'Removing floating management IP {}/{} from interface {}'.format( + self.storage_floatingipaddr, + self.storage_cidrnetmask, + 'brcluster' + ), + state='o' + ) + common.removeIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage') self.logger.out('Releasing read lock for synchronization phase D', state='i') lock.release() self.logger.out('Released read lock for synchronization phase D', state='o') diff --git a/node-daemon/pvcnoded/VMInstance.py b/node-daemon/pvcnoded/VMInstance.py index 56713418..ca04353c 100644 --- a/node-daemon/pvcnoded/VMInstance.py +++ b/node-daemon/pvcnoded/VMInstance.py @@ -35,7 +35,7 @@ import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance import daemon_lib.common as daemon_common -def flush_locks(zk_conn, logger, dom_uuid): +def flush_locks(zk_conn, logger, dom_uuid, this_node=None): logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i') # Get the list of RBD images rbd_list = zkhandler.readdata(zk_conn, '/domains/{}/rbdlist'.format(dom_uuid)).split(',') @@ -56,10 +56,13 @@ def flush_locks(zk_conn, logger, dom_uuid): if lock_list: # Loop through the locks for lock in lock_list: + if this_node is not None and lock['address'].split(':')[0] != this_node.storage_ipaddr: + logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e')) + continue # Free the lock lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker'])) if lock_remove_retcode != 0: - logger.out('Failed to free RBD lock "{}" on volume "{}"\n{}'.format(lock['id'], rbd, lock_remove_stderr), state='e') + logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e') continue logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o') @@ -74,15 +77,14 @@ def run_command(zk_conn, logger, this_node, data): # Flushing VM RBD locks if command == 'flush_locks': dom_uuid = args - # If this node is taking over primary state, wait until it's done - while this_node.router_state == 'takeover': - time.sleep(1) - if this_node.router_state == 'primary': + + # Verify that the VM is set to run on this node + if this_node.d_domain[dom_uuid].getnode() == this_node.name: # Lock the command queue zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains') with zk_lock: - # Add the OSD - result = flush_locks(zk_conn, logger, dom_uuid) + # Flush the lock + result = flush_locks(zk_conn, logger, dom_uuid, this_node) # Command succeeded if result: # Update the command queue @@ -225,6 +227,12 @@ class VMInstance(object): except Exception: curstate = 'notstart' + # Handle situations where the VM crashed or the node unexpectedly rebooted + if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: + # Flush locks + self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid)) + flush_locks(self.zk_conn, self.logger, self.domuuid, self.this_node) + if curstate == libvirt.VIR_DOMAIN_RUNNING: # If it is running just update the model self.addDomainToList()