Convert NodeInstance to new ZK schema handler

This commit is contained in:
Joshua Boniface 2021-06-09 22:07:32 -04:00
parent b94fe88405
commit 450bf6b153
1 changed files with 59 additions and 59 deletions

View File

@ -38,7 +38,7 @@ class NodeInstance(object):
# Which node is primary # Which node is primary
self.primary_node = None self.primary_node = None
# States # States
self.daemon_mode = self.zkhandler.read('/nodes/{}/daemonmode'.format(self.name)) self.daemon_mode = self.zkhandler.read(('node.mode.daemon', self.name))
self.daemon_state = 'stop' self.daemon_state = 'stop'
self.router_state = 'client' self.router_state = 'client'
self.domain_state = 'ready' self.domain_state = 'ready'
@ -90,7 +90,7 @@ class NodeInstance(object):
self.flush_stopper = False self.flush_stopper = False
# Zookeeper handlers for changed states # Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.state.daemon', self.name))
def watch_node_daemonstate(data, stat, event=''): def watch_node_daemonstate(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -105,7 +105,7 @@ class NodeInstance(object):
if data != self.daemon_state: if data != self.daemon_state:
self.daemon_state = data self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.state.router', self.name))
def watch_node_routerstate(data, stat, event=''): def watch_node_routerstate(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -135,10 +135,10 @@ class NodeInstance(object):
else: else:
# We did nothing, so just become secondary state # We did nothing, so just become secondary state
self.zkhandler.write([ self.zkhandler.write([
('/nodes/{}/routerstate'.format(self.name), 'secondary') (('node.state.router', self.name), 'secondary')
]) ])
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.state.domain', self.name))
def watch_node_domainstate(data, stat, event=''): def watch_node_domainstate(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -171,7 +171,7 @@ class NodeInstance(object):
self.flush_thread = Thread(target=self.unflush, args=(), kwargs={}) self.flush_thread = Thread(target=self.unflush, args=(), kwargs={})
self.flush_thread.start() self.flush_thread.start()
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.memory.free', self.name))
def watch_node_memfree(data, stat, event=''): def watch_node_memfree(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -186,7 +186,7 @@ class NodeInstance(object):
if data != self.memfree: if data != self.memfree:
self.memfree = data self.memfree = data
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.memory.used', self.name))
def watch_node_memused(data, stat, event=''): def watch_node_memused(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -201,7 +201,7 @@ class NodeInstance(object):
if data != self.memused: if data != self.memused:
self.memused = data self.memused = data
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.memory.allocated', self.name))
def watch_node_memalloc(data, stat, event=''): def watch_node_memalloc(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -216,7 +216,7 @@ class NodeInstance(object):
if data != self.memalloc: if data != self.memalloc:
self.memalloc = data self.memalloc = data
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.vcpu.allocated', self.name))
def watch_node_vcpualloc(data, stat, event=''): def watch_node_vcpualloc(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -231,7 +231,7 @@ class NodeInstance(object):
if data != self.vcpualloc: if data != self.vcpualloc:
self.vcpualloc = data self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.running_domains', self.name))
def watch_node_runningdomains(data, stat, event=''): def watch_node_runningdomains(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -246,7 +246,7 @@ class NodeInstance(object):
if data != self.domain_list: if data != self.domain_list:
self.domain_list = data self.domain_list = data
@self.zkhandler.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name)) @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.count.provisioned_domainss', self.name))
def watch_node_domainscount(data, stat, event=''): def watch_node_domainscount(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher # The key has been deleted after existing before; terminate this watcher
@ -324,30 +324,30 @@ class NodeInstance(object):
Acquire primary coordinator status from a peer node Acquire primary coordinator status from a peer node
""" """
# Lock the primary node until transition is complete # Lock the primary node until transition is complete
primary_lock = self.zkhandler.exclusivelock('/config/primary_node') primary_lock = self.zkhandler.exclusivelock('base.config.primary_node')
primary_lock.acquire() primary_lock.acquire()
# Ensure our lock key is populated # Ensure our lock key is populated
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
# Synchronize nodes A (I am writer) # Synchronize nodes A (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase A', state='i') self.logger.out('Acquiring write lock for synchronization phase A', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase A', state='o') self.logger.out('Acquired write lock for synchronization phase A', state='o')
time.sleep(1) # Time fir reader to acquire the lock time.sleep(1) # Time fir reader to acquire the lock
self.logger.out('Releasing write lock for synchronization phase A', state='i') self.logger.out('Releasing write lock for synchronization phase A', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase A', state='o') self.logger.out('Released write lock for synchronization phase A', state='o')
time.sleep(0.1) # Time fir new writer to acquire the lock time.sleep(0.1) # Time fir new writer to acquire the lock
# Synchronize nodes B (I am reader) # Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase B', state='i') self.logger.out('Acquiring read lock for synchronization phase B', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase B', state='o') self.logger.out('Acquired read lock for synchronization phase B', state='o')
@ -356,7 +356,7 @@ class NodeInstance(object):
self.logger.out('Released read lock for synchronization phase B', state='o') self.logger.out('Released read lock for synchronization phase B', state='o')
# Synchronize nodes C (I am writer) # Synchronize nodes C (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase C', state='i') self.logger.out('Acquiring write lock for synchronization phase C', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase C', state='o') self.logger.out('Acquired write lock for synchronization phase C', state='o')
@ -373,13 +373,13 @@ class NodeInstance(object):
common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream') common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing write lock for synchronization phase C', state='i') self.logger.out('Releasing write lock for synchronization phase C', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase C', state='o') self.logger.out('Released write lock for synchronization phase C', state='o')
# Synchronize nodes D (I am writer) # Synchronize nodes D (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase D', state='i') self.logger.out('Acquiring write lock for synchronization phase D', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o') self.logger.out('Acquired write lock for synchronization phase D', state='o')
@ -405,13 +405,13 @@ class NodeInstance(object):
common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage') common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
self.logger.out('Releasing write lock for synchronization phase D', state='i') self.logger.out('Releasing write lock for synchronization phase D', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase D', state='o') self.logger.out('Released write lock for synchronization phase D', state='o')
# Synchronize nodes E (I am writer) # Synchronize nodes E (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase E', state='i') self.logger.out('Acquiring write lock for synchronization phase E', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase E', state='o') self.logger.out('Acquired write lock for synchronization phase E', state='o')
@ -428,13 +428,13 @@ class NodeInstance(object):
common.createIPAddress('169.254.169.254', '32', 'lo') common.createIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing write lock for synchronization phase E', state='i') self.logger.out('Releasing write lock for synchronization phase E', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase E', state='o') self.logger.out('Released write lock for synchronization phase E', state='o')
# Synchronize nodes F (I am writer) # Synchronize nodes F (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase F', state='i') self.logger.out('Acquiring write lock for synchronization phase F', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase F', state='o') self.logger.out('Acquired write lock for synchronization phase F', state='o')
@ -444,13 +444,13 @@ class NodeInstance(object):
self.d_network[network].createGateways() self.d_network[network].createGateways()
self.logger.out('Releasing write lock for synchronization phase F', state='i') self.logger.out('Releasing write lock for synchronization phase F', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase F', state='o') self.logger.out('Released write lock for synchronization phase F', state='o')
# Synchronize nodes G (I am writer) # Synchronize nodes G (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase G', state='i') self.logger.out('Acquiring write lock for synchronization phase G', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase G', state='o') self.logger.out('Acquired write lock for synchronization phase G', state='o')
@ -518,7 +518,7 @@ class NodeInstance(object):
self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e') self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e')
self.logger.out('Releasing write lock for synchronization phase G', state='i') self.logger.out('Releasing write lock for synchronization phase G', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase G', state='o') self.logger.out('Released write lock for synchronization phase G', state='o')
@ -527,7 +527,7 @@ class NodeInstance(object):
time.sleep(2) time.sleep(2)
primary_lock.release() primary_lock.release()
self.zkhandler.write([ self.zkhandler.write([
('/nodes/{}/routerstate'.format(self.name), 'primary') (('node.state.router', self.name), 'primary')
]) ])
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o') self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
@ -538,7 +538,7 @@ class NodeInstance(object):
time.sleep(0.2) # Initial delay for the first writer to grab the lock time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Synchronize nodes A (I am reader) # Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase A', state='i') self.logger.out('Acquiring read lock for synchronization phase A', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase A', state='o') self.logger.out('Acquired read lock for synchronization phase A', state='o')
@ -547,7 +547,7 @@ class NodeInstance(object):
self.logger.out('Released read lock for synchronization phase A', state='o') self.logger.out('Released read lock for synchronization phase A', state='o')
# Synchronize nodes B (I am writer) # Synchronize nodes B (I am writer)
lock = self.zkhandler.writelock('/locks/primary_node') lock = self.zkhandler.writelock('base.lock.primary_node')
self.logger.out('Acquiring write lock for synchronization phase B', state='i') self.logger.out('Acquiring write lock for synchronization phase B', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase B', state='o') self.logger.out('Acquired write lock for synchronization phase B', state='o')
@ -559,7 +559,7 @@ class NodeInstance(object):
self.d_network[network].stopDHCPServer() self.d_network[network].stopDHCPServer()
self.logger.out('Releasing write lock for synchronization phase B', state='i') self.logger.out('Releasing write lock for synchronization phase B', state='i')
self.zkhandler.write([ self.zkhandler.write([
('/locks/primary_node', '') ('base.lock.primary_node', '')
]) ])
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase B', state='o') self.logger.out('Released write lock for synchronization phase B', state='o')
@ -572,7 +572,7 @@ class NodeInstance(object):
time.sleep(0.1) # Time fir new writer to acquire the lock time.sleep(0.1) # Time fir new writer to acquire the lock
# Synchronize nodes C (I am reader) # Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase C', state='i') self.logger.out('Acquiring read lock for synchronization phase C', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase C', state='o') self.logger.out('Acquired read lock for synchronization phase C', state='o')
@ -591,7 +591,7 @@ class NodeInstance(object):
self.logger.out('Released read lock for synchronization phase C', state='o') self.logger.out('Released read lock for synchronization phase C', state='o')
# Synchronize nodes D (I am reader) # Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase D', state='i') self.logger.out('Acquiring read lock for synchronization phase D', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o') self.logger.out('Acquired read lock for synchronization phase D', state='o')
@ -619,7 +619,7 @@ class NodeInstance(object):
self.logger.out('Released read lock for synchronization phase D', state='o') self.logger.out('Released read lock for synchronization phase D', state='o')
# Synchronize nodes E (I am reader) # Synchronize nodes E (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase E', state='i') self.logger.out('Acquiring read lock for synchronization phase E', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase E', state='o') self.logger.out('Acquired read lock for synchronization phase E', state='o')
@ -638,7 +638,7 @@ class NodeInstance(object):
self.logger.out('Released read lock for synchronization phase E', state='o') self.logger.out('Released read lock for synchronization phase E', state='o')
# Synchronize nodes F (I am reader) # Synchronize nodes F (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase F', state='i') self.logger.out('Acquiring read lock for synchronization phase F', state='i')
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase F', state='o') self.logger.out('Acquired read lock for synchronization phase F', state='o')
@ -650,7 +650,7 @@ class NodeInstance(object):
self.logger.out('Released read lock for synchronization phase F', state='o') self.logger.out('Released read lock for synchronization phase F', state='o')
# Synchronize nodes G (I am reader) # Synchronize nodes G (I am reader)
lock = self.zkhandler.readlock('/locks/primary_node') lock = self.zkhandler.readlock('base.lock.primary_node')
self.logger.out('Acquiring read lock for synchronization phase G', state='i') self.logger.out('Acquiring read lock for synchronization phase G', state='i')
try: try:
lock.acquire(timeout=60) # Don't wait forever and completely block us lock.acquire(timeout=60) # Don't wait forever and completely block us
@ -664,7 +664,7 @@ class NodeInstance(object):
# Wait 2 seconds for everything to stabilize before we declare all-done # Wait 2 seconds for everything to stabilize before we declare all-done
time.sleep(2) time.sleep(2)
self.zkhandler.write([ self.zkhandler.write([
('/nodes/{}/routerstate'.format(self.name), 'secondary') (('node.state.router', self.name), 'secondary')
]) ])
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o') self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
@ -685,10 +685,10 @@ class NodeInstance(object):
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i') self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
# Don't replace the previous node if the VM is already migrated # Don't replace the previous node if the VM is already migrated
if self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)): if self.zkhandler.read(('domain.last_node', dom_uuid)):
current_node = self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) current_node = self.zkhandler.read(('domain.last_node', dom_uuid))
else: else:
current_node = self.zkhandler.read('/domains/{}/node'.format(dom_uuid)) current_node = self.zkhandler.read(('domain.node', dom_uuid))
target_node = common.findTargetNode(self.zkhandler, dom_uuid) target_node = common.findTargetNode(self.zkhandler, dom_uuid)
if target_node == current_node: if target_node == current_node:
@ -697,20 +697,20 @@ class NodeInstance(object):
if target_node is None: if target_node is None:
self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e') self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e')
self.zkhandler.write([ self.zkhandler.write([
('/domains/{}/state'.format(dom_uuid), 'shutdown'), (('domain.state', dom_uuid), 'shutdown'),
('/domains/{}/node_autostart'.format(dom_uuid), 'True') (('domain.meta.autostart', dom_uuid), 'True'),
]) ])
else: else:
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i') self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
self.zkhandler.write([ self.zkhandler.write([
('/domains/{}/state'.format(dom_uuid), 'migrate'), (('domain.state', dom_uuid), 'migrate'),
('/domains/{}/node'.format(dom_uuid), target_node), (('domain.node', dom_uuid), target_node),
('/domains/{}/lastnode'.format(dom_uuid), current_node) (('domain.last_node', dom_uuid), current_node),
]) ])
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways) # Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
ticks = 0 ticks = 0
while self.zkhandler.read('/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']: while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
ticks += 1 ticks += 1
if ticks > 600: if ticks > 600:
# Abort if we've waited for 120 seconds, the VM is messed and just continue # Abort if we've waited for 120 seconds, the VM is messed and just continue
@ -718,8 +718,8 @@ class NodeInstance(object):
time.sleep(0.2) time.sleep(0.2)
self.zkhandler.write([ self.zkhandler.write([
('/nodes/{}/runningdomains'.format(self.name), ''), (('node.running_domains', self.name), ''),
('/nodes/{}/domainstate'.format(self.name), 'flushed') (('node.state.domain', self.name), 'flushed'),
]) ])
self.flush_thread = None self.flush_thread = None
self.flush_stopper = False self.flush_stopper = False
@ -737,20 +737,20 @@ class NodeInstance(object):
return return
# Handle autostarts # Handle autostarts
autostart = self.zkhandler.read('/domains/{}/node_autostart'.format(dom_uuid)) autostart = self.zkhandler.read(('domain.meta.autostart', dom_uuid))
node = self.zkhandler.read('/domains/{}/node'.format(dom_uuid)) node = self.zkhandler.read(('domain.node', dom_uuid))
if autostart == 'True' and node == self.name: if autostart == 'True' and node == self.name:
self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i') self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i')
self.zkhandler.write([ self.zkhandler.write([
('/domains/{}/state'.format(dom_uuid), 'start'), (('domain.state', dom_uuid), 'start'),
('/domains/{}/node'.format(dom_uuid), self.name), (('domain.node', dom_uuid), self.name),
('/domains/{}/lastnode'.format(dom_uuid), ''), (('domain.last_node', dom_uuid), ''),
('/domains/{}/node_autostart'.format(dom_uuid), 'False') (('domain.meta.autostart', dom_uuid), 'False'),
]) ])
continue continue
try: try:
last_node = self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) last_node = self.zkhandler.read(('domain.last_node', dom_uuid))
except Exception: except Exception:
continue continue
@ -759,17 +759,17 @@ class NodeInstance(object):
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i') self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
self.zkhandler.write([ self.zkhandler.write([
('/domains/{}/state'.format(dom_uuid), 'migrate'), (('domain.state', dom_uuid), 'migrate'),
('/domains/{}/node'.format(dom_uuid), self.name), (('domain.node', dom_uuid), self.name),
('/domains/{}/lastnode'.format(dom_uuid), '') (('domain.last_node', dom_uuid), ''),
]) ])
# Wait for the VM to migrate back # Wait for the VM to migrate back
while self.zkhandler.read('/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']: while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
time.sleep(0.1) time.sleep(0.1)
self.zkhandler.write([ self.zkhandler.write([
('/nodes/{}/domainstate'.format(self.name), 'ready') (('node.state.domain', self.name), 'ready')
]) ])
self.flush_thread = None self.flush_thread = None
self.flush_stopper = False self.flush_stopper = False