[#5] Update the writer functions too

This commit is contained in:
Joshua Boniface 2018-06-26 22:52:55 -04:00
parent 4737556c2c
commit 276b618137
4 changed files with 21 additions and 28 deletions

View File

@ -34,6 +34,7 @@ import configparser
import apscheduler.schedulers.background import apscheduler.schedulers.background
import pvcd.ansiiprint as ansiiprint import pvcd.ansiiprint as ansiiprint
import pvcd.zkhandler as zkhandler
import pvcd.VMInstance as VMInstance import pvcd.VMInstance as VMInstance
import pvcd.NodeInstance as NodeInstance import pvcd.NodeInstance as NodeInstance
@ -145,7 +146,7 @@ zk_conn.add_listener(zk_listener)
def cleanup(signum, frame): def cleanup(signum, frame):
ansiiprint.echo('Terminating daemon', '', 'e') ansiiprint.echo('Terminating daemon', '', 'e')
# Set stop state in Zookeeper # Set stop state in Zookeeper
zk_conn.set('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii')) zkhandler.writedata(zk_conn, '/nodes/{}/daemonstate'.format(myhostname), [ 'stop' ])
# Close the Zookeeper connection # Close the Zookeeper connection
zk_conn.close() zk_conn.close()
# Stop keepalive thread # Stop keepalive thread
@ -179,7 +180,7 @@ print(' {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticda
if zk_conn.exists('/nodes/{}'.format(myhostname)): if zk_conn.exists('/nodes/{}'.format(myhostname)):
print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper") print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper")
# Update static data just in case it's changed # Update static data just in case it's changed
zk_conn.set('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata).encode('ascii')) zkhandler.writedata(zk_conn, '/nodes/{}/staticdata'.format(myhostname), [ ' '.join(staticdata) ])
else: else:
print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node") print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node")
keepalive_time = int(time.time()) keepalive_time = int(time.time())
@ -201,7 +202,7 @@ else:
transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii')) transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii'))
transaction.commit() transaction.commit()
zk_conn.set('/nodes/{}/daemonstate'.format(myhostname), 'init'.encode('ascii')) zkhandler.writedata(zk_conn, '/nodes/{}/daemonstate'.format(myhostname), [ 'init' ])
t_node = dict() t_node = dict()
s_domain = dict() s_domain = dict()

View File

@ -162,14 +162,14 @@ class NodeInstance():
transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), current_hypervisor.encode('ascii')) transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), current_hypervisor.encode('ascii'))
transaction.commit() transaction.commit()
self.zk_conn.set('/nodes/{}/runningdomains'.format(self.name), ''.encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.name), [ '' ])
self.zk_conn.set('/nodes/{}/domainstate'.format(self.name), 'flushed'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(self.name), [ 'flushed' ])
self.inflush = False self.inflush = False
def unflush(self): def unflush(self):
self.inflush = True self.inflush = True
ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i') ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i')
self.zk_conn.set('/nodes/{}/domainstate'.format(self.name), 'ready'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(self.name), [ 'ready' ])
for dom_uuid in self.s_domain: for dom_uuid in self.s_domain:
last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid)) last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid))
if last_hypervisor != self.name: if last_hypervisor != self.name:
@ -196,7 +196,7 @@ class NodeInstance():
past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name)) past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name))
if past_state != 'run': if past_state != 'run':
self.daemon_state = 'run' self.daemon_state = 'run'
self.zk_conn.set('/nodes/{}/daemonstate'.format(self.name), 'run'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name), [ 'run' ])
else: else:
self.daemon_state = 'run' self.daemon_state = 'run'
@ -210,7 +210,7 @@ class NodeInstance():
raise raise
except Exception as e: except Exception as e:
# Toggle a state "change" # Toggle a state "change"
self.zk_conn.set('/domains/{}/state'.format(domain), instance.getstate().encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(domain), [ instance.getstate() ])
# Set our information in zookeeper # Set our information in zookeeper
self.name = lv_conn.getHostname() self.name = lv_conn.getHostname()
@ -256,7 +256,7 @@ class NodeInstance():
if node_keepalive < node_deadtime and node_daemon_state == 'run': if node_keepalive < node_deadtime and node_daemon_state == 'run':
# CHECK VERSIONING HERE # CHECK VERSIONING HERE
ansiiprint.echo('Node {} seems dead - starting monitor for fencing'.format(node_name), '', 'w') ansiiprint.echo('Node {} seems dead - starting monitor for fencing'.format(node_name), '', 'w')
self.zk_conn.set('/nodes/{}/daemonstate'.format(node_name), 'dead'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name), [ 'dead' ])
fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn), kwargs={}) fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn), kwargs={})
fence_thread.start() fence_thread.start()
@ -351,7 +351,7 @@ def fenceNode(node_name, zk_conn):
transaction.commit() transaction.commit()
# Set node in flushed state for easy remigrating when it comes back # Set node in flushed state for easy remigrating when it comes back
zk_conn.set('/nodes/{}/domainstate'.format(node_name), 'flushed'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name), [ 'flushed' ])
# #
# Perform an IPMI fence # Perform an IPMI fence

View File

@ -72,7 +72,7 @@ class VMInstance:
# Add the domain to the domain_list array # Add the domain to the domain_list array
self.thishypervisor.domain_list.append(self.domuuid) self.thishypervisor.domain_list.append(self.domuuid)
# Push the change up to Zookeeper # Push the change up to Zookeeper
self.zk_conn.set('/nodes/{}/runningdomains'.format(self.thishypervisor.name), ' '.join(self.thishypervisor.domain_list).encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.thishypervisor.name), [ ' '.join(self.thishypervisor.domain_list) ])
except Exception as e: except Exception as e:
ansiiprint.echo('Error adding domain to list: {}'.format(e), '', 'c') ansiiprint.echo('Error adding domain to list: {}'.format(e), '', 'c')
@ -82,7 +82,7 @@ class VMInstance:
# Remove the domain from the domain_list array # Remove the domain from the domain_list array
self.thishypervisor.domain_list.remove(self.domuuid) self.thishypervisor.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper # Push the change up to Zookeeper
self.zk_conn.set('/nodes/{}/runningdomains'.format(self.thishypervisor.name), ' '.join(self.thishypervisor.domain_list).encode('ascii')) zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.thishypervisor.name), [ ' '.join(self.thishypervisor.domain_list) ])
except Exception as e: except Exception as e:
ansiiprint.echo('Error removing domain from list: {}'.format(e), '', 'c') ansiiprint.echo('Error removing domain from list: {}'.format(e), '', 'c')
@ -108,7 +108,7 @@ class VMInstance:
self.dom = dom self.dom = dom
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
ansiiprint.echo('Failed to create VM', '{}:'.format(self.domuuid), 'e') ansiiprint.echo('Failed to create VM', '{}:'.format(self.domuuid), 'e')
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'failed'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'failed' ])
self.dom = None self.dom = None
lv_conn.close() lv_conn.close()
@ -135,7 +135,7 @@ class VMInstance:
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
ansiiprint.echo('Failed to restart VM', '{}:'.format(self.domuuid), 'e') ansiiprint.echo('Failed to restart VM', '{}:'.format(self.domuuid), 'e')
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'start'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ])
lv_conn.close() lv_conn.close()
self.inrestart = False self.inrestart = False
@ -163,7 +163,7 @@ class VMInstance:
self.removeDomainFromList() self.removeDomainFromList()
if self.inrestart == False: if self.inrestart == False:
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'stop'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'stop' ])
ansiiprint.echo('Successfully stopped VM', '{}:'.format(self.domuuid), 'o') ansiiprint.echo('Successfully stopped VM', '{}:'.format(self.domuuid), 'o')
self.dom = None self.dom = None
@ -191,7 +191,7 @@ class VMInstance:
self.removeDomainFromList() self.removeDomainFromList()
if self.inrestart == False: if self.inrestart == False:
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'stop'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'stop' ])
ansiiprint.echo('Successfully shutdown VM', '{}:'.format(self.domuuid), 'o') ansiiprint.echo('Successfully shutdown VM', '{}:'.format(self.domuuid), 'o')
self.dom = None self.dom = None
@ -237,7 +237,7 @@ class VMInstance:
self.removeDomainFromList() self.removeDomainFromList()
time.sleep(1) time.sleep(1)
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'start'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ])
self.inmigrate = False self.inmigrate = False
# Receive the migration from another host (wait until VM is running) # Receive the migration from another host (wait until VM is running)
@ -322,7 +322,7 @@ class VMInstance:
self.addDomainToList() self.addDomainToList()
# VM is already running and should be but stuck in migrate state # VM is already running and should be but stuck in migrate state
elif self.state == "migrate": elif self.state == "migrate":
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'start'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ])
self.addDomainToList() self.addDomainToList()
# VM should be restarted # VM should be restarted
elif self.state == "restart": elif self.state == "restart":
@ -342,7 +342,7 @@ class VMInstance:
self.receive_migrate() self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running) # VM should be restarted (i.e. started since it isn't running)
if self.state == "restart": if self.state == "restart":
self.zk_conn.set('/domains/{}/state'.format(self.domuuid), 'start'.encode('ascii')) zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ])
# VM should be shut down; ensure it's gone from this node's domain_list # VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown": elif self.state == "shutdown":
self.removeDomainFromList() self.removeDomainFromList()

View File

@ -46,8 +46,7 @@ def writedata(zk_conn, key, data):
version = meta.version version = meta.version
new_version = version + 1 new_version = version + 1
zk_transaction = zk_conn.transaction() zk_transaction = zk_conn.transaction()
for line in data: zk_transaction.set_data(key, data.encode('ascii'))
zk_transaction.set_data(key, line.encode('ascii'))
try: try:
zk_transaction.check(key, new_version) zk_transaction.check(key, new_version)
except TypeError: except TypeError:
@ -56,10 +55,3 @@ def writedata(zk_conn, key, data):
zk_transaction.commit() zk_transaction.commit()
return 0 return 0
# Key create function
def createkey(zk_conn, key, data):
zk_transaction = zk_conn.transaction()
for line in data:
zk_transaction.create(key, line.encode('ascii'))
zk_transaction.commit()