Convert VMInstance to new zkhandler

This commit is contained in:
Joshua Boniface 2021-06-01 11:46:27 -04:00
parent 8a4a41e092
commit 790098f181
3 changed files with 138 additions and 87 deletions

View File

@ -963,7 +963,7 @@ if enable_hypervisor:
@zkhandler.zk_conn.DataWatch('/cmd/domains') @zkhandler.zk_conn.DataWatch('/cmd/domains')
def cmd_domains(data, stat, event=''): def cmd_domains(data, stat, event=''):
if data: if data:
VMInstance.run_command(zkhandler.zk_conn, logger, this_node, data.decode('ascii')) VMInstance.run_command(zkhandler, logger, this_node, data.decode('ascii'))
# VM domain objects # VM domain objects
@zkhandler.zk_conn.ChildrenWatch('/domains') @zkhandler.zk_conn.ChildrenWatch('/domains')
@ -973,7 +973,7 @@ if enable_hypervisor:
# Add any missing domains to the list # Add any missing domains to the list
for domain in new_domain_list: for domain in new_domain_list:
if domain not in domain_list: if domain not in domain_list:
d_domain[domain] = VMInstance.VMInstance(domain, zkhandler.zk_conn, config, logger, this_node) d_domain[domain] = VMInstance.VMInstance(domain, zkhandler, config, logger, this_node)
# Remove any deleted domains from the list # Remove any deleted domains from the list
for domain in domain_list: for domain in domain_list:

View File

@ -25,15 +25,13 @@ import time
from threading import Thread, Event from threading import Thread, Event
from collections import deque from collections import deque
import pvcnoded.zkhandler as zkhandler
class VMConsoleWatcherInstance(object): class VMConsoleWatcherInstance(object):
# Initialization function # Initialization function
def __init__(self, domuuid, domname, zk_conn, config, logger, this_node): def __init__(self, domuuid, domname, zkhandler, config, logger, this_node):
self.domuuid = domuuid self.domuuid = domuuid
self.domname = domname self.domname = domname
self.zk_conn = zk_conn self.zkhandler = zkhandler
self.config = config self.config = config
self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname) self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname)
self.console_log_lines = config['console_log_lines'] self.console_log_lines = config['console_log_lines']
@ -93,7 +91,9 @@ class VMConsoleWatcherInstance(object):
self.fetch_lines() self.fetch_lines()
# Update Zookeeper with the new loglines if they changed # Update Zookeeper with the new loglines if they changed
if self.loglines != self.last_loglines: if self.loglines != self.last_loglines:
zkhandler.writedata(self.zk_conn, {'/domains/{}/consolelog'.format(self.domuuid): self.loglines}) self.zkhandler.write([
('/domains/{}/consolelog'.format(self.domuuid), self.loglines)
])
self.last_loglines = self.loglines self.last_loglines = self.loglines
def fetch_lines(self): def fetch_lines(self):

View File

@ -28,7 +28,6 @@ from threading import Thread
from xml.etree import ElementTree from xml.etree import ElementTree
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common import pvcnoded.common as common
import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance
@ -36,10 +35,10 @@ import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import daemon_lib.common as daemon_common import daemon_lib.common as daemon_common
def flush_locks(zk_conn, logger, dom_uuid, this_node=None): def flush_locks(zkhandler, logger, dom_uuid, this_node=None):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i') logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images # Get the list of RBD images
rbd_list = zkhandler.readdata(zk_conn, '/domains/{}/rbdlist'.format(dom_uuid)).split(',') rbd_list = zkhandler.read('/domains/{}/rbdlist'.format(dom_uuid)).split(',')
for rbd in rbd_list: for rbd in rbd_list:
# Check if a lock exists # Check if a lock exists
lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd)) lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd))
@ -57,17 +56,21 @@ def flush_locks(zk_conn, logger, dom_uuid, this_node=None):
if lock_list: if lock_list:
# Loop through the locks # Loop through the locks
for lock in lock_list: for lock in lock_list:
if this_node is not None and zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr: if this_node is not None and zkhandler.read('/domains/{}/state'.format(dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr:
logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e')) logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e'))
zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'fail'}) zkhandler.write([
zkhandler.writedata(zk_conn, {'/domains/{}/failedreason'.format(dom_uuid): 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)}) ('/domains/{}/state'.format(dom_uuid), 'fail'),
('/domains/{}/failedreason'.format(dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd))
])
break break
# Free the lock # Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker'])) lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker']))
if lock_remove_retcode != 0: if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e') logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e')
zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'fail'}) zkhandler.write([
zkhandler.writedata(zk_conn, {'/domains/{}/failedreason'.format(dom_uuid): 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)}) ('/domains/{}/state'.format(dom_uuid), 'fail'),
('/domains/{}/failedreason'.format(dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr))
])
break break
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o') logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o')
@ -75,7 +78,7 @@ def flush_locks(zk_conn, logger, dom_uuid, this_node=None):
# Primary command function # Primary command function
def run_command(zk_conn, logger, this_node, data): def run_command(zkhandler, logger, this_node, data):
# Get the command and args # Get the command and args
command, args = data.split() command, args = data.split()
@ -86,45 +89,49 @@ def run_command(zk_conn, logger, this_node, data):
# Verify that the VM is set to run on this node # Verify that the VM is set to run on this node
if this_node.d_domain[dom_uuid].getnode() == this_node.name: if this_node.d_domain[dom_uuid].getnode() == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains') zk_lock = zkhandler.writelock('/cmd/domains')
with zk_lock: with zk_lock:
# Flush the lock # Flush the lock
result = flush_locks(zk_conn, logger, dom_uuid, this_node) result = flush_locks(zkhandler, logger, dom_uuid, this_node)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/domains': 'success-{}'.format(data)}) zkhandler.write([
('/cmd/domains', 'success-{}'.format(data))
])
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/domains': 'failure-{}'.format(data)}) zkhandler.write([
('/cmd/domains', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
class VMInstance(object): class VMInstance(object):
# Initialization function # Initialization function
def __init__(self, domuuid, zk_conn, config, logger, this_node): def __init__(self, domuuid, zkhandler, config, logger, this_node):
# Passed-in variables on creation # Passed-in variables on creation
self.domuuid = domuuid self.domuuid = domuuid
self.zk_conn = zk_conn self.zkhandler = zkhandler
self.config = config self.config = config
self.logger = logger self.logger = logger
self.this_node = this_node self.this_node = this_node
# Get data from zookeeper # Get data from zookeeper
self.domname = zkhandler.readdata(zk_conn, '/domains/{}'.format(domuuid)) self.domname = self.zkhandler.read('/domains/{}'.format(domuuid))
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid))
self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) self.node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid))
self.lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) self.lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid))
self.last_currentnode = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid))
self.last_lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid))
try: try:
self.pinpolicy = zkhandler.readdata(self.zk_conn, '/domains/{}/pinpolicy'.format(self.domuuid)) self.pinpolicy = self.zkhandler.read('/domains/{}/pinpolicy'.format(self.domuuid))
except Exception: except Exception:
self.pinpolicy = "none" self.pinpolicy = "none"
try: try:
self.migration_method = zkhandler.readdata(self.zk_conn, '/domains/{}/migration_method'.format(self.domuuid)) self.migration_method = self.zkhandler.read('/domains/{}/migration_method'.format(self.domuuid))
except Exception: except Exception:
self.migration_method = 'none' self.migration_method = 'none'
@ -140,10 +147,10 @@ class VMInstance(object):
self.dom = self.lookupByUUID(self.domuuid) self.dom = self.lookupByUUID(self.domuuid)
# Log watcher instance # Log watcher instance
self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zk_conn, self.config, self.logger, self.this_node) self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zkhandler, self.config, self.logger, self.this_node)
# Watch for changes to the state field in Zookeeper # Watch for changes to the state field in Zookeeper
@self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid)) @self.zkhandler.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
def watch_state(data, stat, event=""): def watch_state(data, stat, event=""):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -173,7 +180,7 @@ class VMInstance(object):
if self.dom is not None: if self.dom is not None:
memory = int(self.dom.info()[2] / 1024) memory = int(self.dom.info()[2] / 1024)
else: else:
domain_information = daemon_common.getInformationFromXML(self.zk_conn, self.domuuid) domain_information = daemon_common.getInformationFromXML(self.zkhandler, self.domuuid)
memory = int(domain_information['memory']) memory = int(domain_information['memory'])
except Exception: except Exception:
memory = 0 memory = 0
@ -195,7 +202,9 @@ class VMInstance(object):
# Add the domain to the domain_list array # Add the domain to the domain_list array
self.this_node.domain_list.append(self.domuuid) self.this_node.domain_list.append(self.domuuid)
# Push the change up to Zookeeper # Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, {'/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list)}) self.zkhandler.write([
('/nodes/{}/runningdomains'.format(self.this_node.name), ' '.join(self.this_node.domain_list))
])
except Exception as e: except Exception as e:
self.logger.out('Error adding domain to list: {}'.format(e), state='e') self.logger.out('Error adding domain to list: {}'.format(e), state='e')
@ -205,7 +214,9 @@ class VMInstance(object):
# Remove the domain from the domain_list array # Remove the domain from the domain_list array
self.this_node.domain_list.remove(self.domuuid) self.this_node.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper # Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, {'/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list)}) self.zkhandler.write([
('/nodes/{}/runningdomains'.format(self.this_node.name), ' '.join(self.this_node.domain_list))
])
except Exception as e: except Exception as e:
self.logger.out('Error removing domain from list: {}'.format(e), state='e') self.logger.out('Error removing domain from list: {}'.format(e), state='e')
@ -218,11 +229,17 @@ class VMInstance(object):
self.logger.out('Updating VNC data', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Updating VNC data', state='i', prefix='Domain {}'.format(self.domuuid))
port = graphics.get('port', '') port = graphics.get('port', '')
listen = graphics.get('listen', '') listen = graphics.get('listen', '')
zkhandler.writedata(self.zk_conn, {'/domains/{}/vnc'.format(self.domuuid): '{}:{}'.format(listen, port)}) self.zkhandler.write([
('/domains/{}/vnc'.format(self.domuuid), '{}:{}'.format(listen, port))
])
else: else:
zkhandler.writedata(self.zk_conn, {'/domains/{}/vnc'.format(self.domuuid): ''}) self.zkhandler.write([
('/domains/{}/vnc'.format(self.domuuid), '')
])
else: else:
zkhandler.writedata(self.zk_conn, {'/domains/{}/vnc'.format(self.domuuid): ''}) self.zkhandler.write([
('/domains/{}/vnc'.format(self.domuuid), '')
])
# Start up the VM # Start up the VM
def start_vm(self): def start_vm(self):
@ -251,8 +268,8 @@ class VMInstance(object):
if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
# Flush locks # Flush locks
self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid))
flush_locks(self.zk_conn, self.logger, self.domuuid, self.this_node) flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node)
if zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) == 'fail': if self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) == 'fail':
lv_conn.close() lv_conn.close()
self.dom = None self.dom = None
self.instart = False self.instart = False
@ -261,21 +278,27 @@ class VMInstance(object):
if curstate == libvirt.VIR_DOMAIN_RUNNING: if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model # If it is running just update the model
self.addDomainToList() self.addDomainToList()
zkhandler.writedata(self.zk_conn, {'/domains/{}/failedreason'.format(self.domuuid): ''}) self.zkhandler.write([
('/domains/{}/failedreason'.format(self.domuuid), '')
])
else: else:
# Or try to create it # Or try to create it
try: try:
# Grab the domain information from Zookeeper # Grab the domain information from Zookeeper
xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid)) xmlconfig = self.zkhandler.read('/domains/{}/xml'.format(self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0) dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList() self.addDomainToList()
self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = dom self.dom = dom
zkhandler.writedata(self.zk_conn, {'/domains/{}/failedreason'.format(self.domuuid): ''}) self.zkhandler.write([
('/domains/{}/failedreason'.format(self.domuuid), '')
])
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'fail'}) self.zkhandler.write([
zkhandler.writedata(self.zk_conn, {'/domains/{}/failedreason'.format(self.domuuid): str(e)}) ('/domains/{}/state'.format(self.domuuid), 'fail'),
('/domains/{}/failedreason'.format(self.domuuid), str(e))
])
lv_conn.close() lv_conn.close()
self.dom = None self.dom = None
self.instart = False self.instart = False
@ -303,7 +326,9 @@ class VMInstance(object):
self.start_vm() self.start_vm()
self.addDomainToList() self.addDomainToList()
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'start')
])
lv_conn.close() lv_conn.close()
self.inrestart = False self.inrestart = False
@ -334,7 +359,9 @@ class VMInstance(object):
self.removeDomainFromList() self.removeDomainFromList()
if self.inrestart is False: if self.inrestart is False:
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'stop'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'stop')
])
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None self.dom = None
@ -355,7 +382,7 @@ class VMInstance(object):
time.sleep(1) time.sleep(1)
# Abort shutdown if the state changes to start # Abort shutdown if the state changes to start
current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) current_state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid))
if current_state not in ['shutdown', 'restart']: if current_state not in ['shutdown', 'restart']:
self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = True is_aborted = True
@ -368,7 +395,9 @@ class VMInstance(object):
if lvdomstate != libvirt.VIR_DOMAIN_RUNNING: if lvdomstate != libvirt.VIR_DOMAIN_RUNNING:
self.removeDomainFromList() self.removeDomainFromList()
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'stop'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'stop')
])
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None self.dom = None
# Stop the log watcher # Stop the log watcher
@ -377,7 +406,9 @@ class VMInstance(object):
if tick >= self.config['vm_shutdown_timeout']: if tick >= self.config['vm_shutdown_timeout']:
self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'stop'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'stop')
])
break break
self.inshutdown = False self.inshutdown = False
@ -388,7 +419,9 @@ class VMInstance(object):
if self.inrestart: if self.inrestart:
# Wait to prevent race conditions # Wait to prevent race conditions
time.sleep(1) time.sleep(1)
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'start')
])
# Migrate the VM to a target host # Migrate the VM to a target host
def migrate_vm(self, force_live=False, force_shutdown=False): def migrate_vm(self, force_live=False, force_shutdown=False):
@ -405,24 +438,24 @@ class VMInstance(object):
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
# Used for sanity checking later # Used for sanity checking later
target_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) target_node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid))
aborted = False aborted = False
def abort_migrate(reason): def abort_migrate(reason):
zkhandler.writedata(self.zk_conn, { self.zkhandler.write([
'/domains/{}/state'.format(self.domuuid): 'start', ('/domains/{}/state'.format(self.domuuid), 'start'),
'/domains/{}/node'.format(self.domuuid): self.this_node.name, ('/domains/{}/node'.format(self.domuuid), self.this_node.name),
'/domains/{}/lastnode'.format(self.domuuid): self.last_lastnode ('/domains/{}/lastnode'.format(self.domuuid), self.last_lastnode)
}) ])
migrate_lock_node.release() migrate_lock_node.release()
migrate_lock_state.release() migrate_lock_state.release()
self.inmigrate = False self.inmigrate = False
self.logger.out('Aborted migration: {}'.format(reason), state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Aborted migration: {}'.format(reason), state='i', prefix='Domain {}'.format(self.domuuid))
# Acquire exclusive lock on the domain node key # Acquire exclusive lock on the domain node key
migrate_lock_node = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) migrate_lock_node = self.zkhandler.exclusivelock('/domains/{}/node'.format(self.domuuid))
migrate_lock_state = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) migrate_lock_state = self.zkhandler.exclusivelock('/domains/{}/state'.format(self.domuuid))
migrate_lock_node.acquire() migrate_lock_node.acquire()
migrate_lock_state.acquire() migrate_lock_state.acquire()
@ -434,14 +467,14 @@ class VMInstance(object):
return return
# Synchronize nodes A (I am reader) # Synchronize nodes A (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
if zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) == '': if self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) == '':
self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid))
ticks = 0 ticks = 0
while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) == '': while self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) == '':
time.sleep(0.1) time.sleep(0.1)
ticks += 1 ticks += 1
if ticks > 300: if ticks > 300:
@ -457,7 +490,7 @@ class VMInstance(object):
return return
# Synchronize nodes B (I am writer) # Synchronize nodes B (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
@ -498,8 +531,10 @@ class VMInstance(object):
def migrate_shutdown(): def migrate_shutdown():
self.logger.out('Shutting down VM for offline migration', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Shutting down VM for offline migration', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'shutdown'}) self.zkhandler.write([
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) != 'stop': ('/domains/{}/state'.format(self.domuuid), 'shutdown')
])
while self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) != 'stop':
time.sleep(0.5) time.sleep(0.5)
return True return True
@ -545,7 +580,7 @@ class VMInstance(object):
return return
# Synchronize nodes C (I am writer) # Synchronize nodes C (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
@ -559,20 +594,20 @@ class VMInstance(object):
self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am reader) # Synchronize nodes D (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
self.last_currentnode = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid))
self.last_lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release() lock.release()
self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
# Wait for the receive side to complete before we declare all-done and release locks # Wait for the receive side to complete before we declare all-done and release locks
while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) != '': while self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) != '':
time.sleep(0.5) time.sleep(0.5)
migrate_lock_node.release() migrate_lock_node.release()
migrate_lock_state.release() migrate_lock_state.release()
@ -591,10 +626,12 @@ class VMInstance(object):
self.logger.out('Receiving VM migration from node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Receiving VM migration from node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
# Ensure our lock key is populated # Ensure our lock key is populated
zkhandler.writedata(self.zk_conn, {'/locks/domain_migrate/{}'.format(self.domuuid): self.domuuid}) self.zkhandler.write([
('/locks/domain_migrate/{}'.format(self.domuuid), self.domuuid)
])
# Synchronize nodes A (I am writer) # Synchronize nodes A (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
@ -605,7 +642,7 @@ class VMInstance(object):
time.sleep(0.1) # Time fir new writer to acquire the lock time.sleep(0.1) # Time fir new writer to acquire the lock
# Synchronize nodes B (I am reader) # Synchronize nodes B (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
@ -614,38 +651,44 @@ class VMInstance(object):
self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes C (I am reader) # Synchronize nodes C (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Set the updated data # Set the updated data
self.last_currentnode = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid))
self.last_lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release() lock.release()
self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am writer) # Synchronize nodes D (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time fir reader to acquire the lock time.sleep(0.5) # Time fir reader to acquire the lock
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid))
self.dom = self.lookupByUUID(self.domuuid) self.dom = self.lookupByUUID(self.domuuid)
if self.dom: if self.dom:
lvdomstate = self.dom.state()[0] lvdomstate = self.dom.state()[0]
if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: if lvdomstate == libvirt.VIR_DOMAIN_RUNNING:
# VM has been received and started # VM has been received and started
self.addDomainToList() self.addDomainToList()
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'start')
])
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
else: else:
# The receive somehow failed # The receive somehow failed
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'fail'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'fail'),
('/domains/{}/failed_reason'.format(self.domuuid), 'Failed to receive migration')
])
self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}'.format(self.domuuid))
else: else:
if self.node == self.this_node.name: if self.node == self.this_node.name:
if self.state in ['start']: if self.state in ['start']:
@ -653,7 +696,9 @@ class VMInstance(object):
self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid))
elif self.state in ['stop']: elif self.state in ['stop']:
# The send was shutdown-based # The send was shutdown-based
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'start')
])
else: else:
# The send failed or was aborted # The send failed or was aborted
self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
@ -662,7 +707,9 @@ class VMInstance(object):
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, {'/locks/domain_migrate/{}'.format(self.domuuid): ''}) self.zkhandler.write([
('/locks/domain_migrate/{}'.format(self.domuuid), '')
])
self.inreceive = False self.inreceive = False
return return
@ -671,9 +718,9 @@ class VMInstance(object):
# #
def manage_vm_state(self): def manage_vm_state(self):
# Update the current values from zookeeper # Update the current values from zookeeper
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid))
self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) self.node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid))
self.lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) self.lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid))
# Check the current state of the VM # Check the current state of the VM
try: try:
@ -721,7 +768,9 @@ class VMInstance(object):
elif self.state == "migrate" or self.state == "migrate-live": elif self.state == "migrate" or self.state == "migrate-live":
# Start the log watcher # Start the log watcher
self.console_log_instance.start() self.console_log_instance.start()
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'start')
])
# Add domain to running list # Add domain to running list
self.addDomainToList() self.addDomainToList()
# VM should be restarted # VM should be restarted
@ -744,7 +793,9 @@ class VMInstance(object):
self.receive_migrate() self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running) # VM should be restarted (i.e. started since it isn't running)
if self.state == "restart": if self.state == "restart":
zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) self.zkhandler.write([
('/domains/{}/state'.format(self.domuuid), 'start')
])
# VM should be shut down; ensure it's gone from this node's domain_list # VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown": elif self.state == "shutdown":
self.removeDomainFromList() self.removeDomainFromList()