Move Zookeeper update out of NodeInstance and into the main Daemon

This commit is contained in:
Joshua Boniface 2018-10-22 20:20:27 -04:00
parent d69cb72439
commit 6b5fa3d50b
3 changed files with 260 additions and 247 deletions

View File

@ -20,6 +20,7 @@
#
###############################################################################
# Version string for startup output
version = '0.4'
import kazoo.client
@ -35,10 +36,12 @@ import uuid
import time
import re
import configparser
import threading
import apscheduler.schedulers.background
import pvcd.log as log
import pvcd.zkhandler as zkhandler
import pvcd.fencing as fencing
import pvcd.common as common
import pvcd.DomainInstance as DomainInstance
@ -319,15 +322,18 @@ def cleanup():
logger.out('Terminating pvcd and cleaning up', state='s')
# Set stop state in Zookeeper
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' })
# Force into secondary network state if needed
if this_node.name == this_node.primary_node:
zkhandler.writedata(zk_conn, { '/primary_node': 'none' })
# Wait for things to flush
time.sleep(3)
time.sleep(2)
# Set stop state in Zookeeper
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' })
# Forcibly terminate dnsmasq because it gets stuck sometimes
common.run_os_command('killall dnsmasq')
# Close the Zookeeper connection
try:
@ -631,14 +637,154 @@ def update_domains(new_domain_list):
# PHASE 9 - Run the daemon
###############################################################################
# Set up our update function
update_zookeeper = this_node.update_zookeeper
# Zookeeper keepalive update function
def update_zookeeper():
# Get past state and update if needed
past_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(this_node.name))
if past_state != 'run':
this_node.daemon_state = 'run'
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(this_node.name): 'run' })
else:
this_node.daemon_state = 'run'
# Ensure the primary key is properly set
if this_node.router_state == 'primary':
if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name:
zkhandler.writedata(zk_conn, {'/primary_node': this_node.name})
# Toggle state management of dead VMs to restart them
memalloc = 0
vcpualloc = 0
for domain, instance in this_node.d_domain.items():
if domain in this_node.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.getnode() == this_node.name:
if instance.getdom() != None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
raise
except Exception as e:
# Toggle a state "change"
zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() })
# Connect to libvirt
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return
# Ensure that any running VMs are readded to the domain_list
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
for domain in running_domains:
domain_uuid = domain.UUIDString()
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
# Set our information in zookeeper
#this_node.name = lv_conn.getHostname()
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.memalloc = memalloc
this_node.vcpualloc = vcpualloc
this_node.cpuload = os.getloadavg()[0]
this_node.domains_count = len(lv_conn.listDomainsID())
keepalive_time = int(time.time())
try:
zkhandler.writedata(zk_conn, {
'/nodes/{}/memused'.format(this_node.name): str(this_node.memused),
'/nodes/{}/memfree'.format(this_node.name): str(this_node.memfree),
'/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc),
'/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc),
'/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload),
'/nodes/{}/networkscount'.format(this_node.name): str(this_node.networks_count),
'/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count),
'/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list),
'/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time)
})
except:
logger.out('Failed to set keepalive data', state='e')
return
# Close the Libvirt connection
lv_conn.close()
# Update our local node lists
flushed_node_list = []
active_node_list = []
inactive_node_list = []
for node_name in d_node:
try:
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
node_domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name)))
except:
node_daemon_state = 'unknown'
node_domain_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - ( int(config['keepalive_interval']) * int(config['fence_intervals']) )
if node_keepalive < node_deadtime and node_daemon_state == 'run':
logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })
fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={})
fence_thread.start()
# Update the arrays
if node_domain_state == 'flushed':
flushed_node_list.append(node_name)
else:
if node_daemon_state == 'run':
active_node_list.append(node_name)
else:
inactive_node_list.append(node_name)
# List of the non-primary coordinators
secondary_node_list = this_node.config['coordinators'].split(',')
if secondary_node_list:
secondary_node_list.remove(this_node.primary_node)
for node in secondary_node_list:
if node in inactive_node_list:
secondary_node_list.remove(node)
# Display node information to the terminal
logger.out('{}{} keepalive{}'.format(logger.fmt_purple, this_node.name, logger.fmt_end), state='t')
logger.out(
'{bold}Domains:{nobold} {domcount} '
'{bold}Networks:{nobold} {netcount} '
'{bold}VM memory [MiB]:{nobold} {allocmem} '
'{bold}Free memory [MiB]:{nobold} {freemem} '
'{bold}Used memory [MiB]:{nobold} {usedmem} '
'{bold}Load:{nobold} {load}'.format(
bold=logger.fmt_bold,
nobold=logger.fmt_end,
domcount=this_node.domains_count,
freemem=this_node.memfree,
usedmem=this_node.memused,
load=this_node.cpuload,
allocmem=this_node.memalloc,
netcount=this_node.networks_count
),
)
# Display cluster information to the terminal
logger.out('{}Cluster status{}'.format(logger.fmt_purple, logger.fmt_end), state='t')
logger.out('{}Primary coordinator:{} {}'.format(logger.fmt_bold, logger.fmt_end, this_node.primary_node))
logger.out('{}Secondary coordinators:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(secondary_node_list)))
logger.out('{}Active hypervisors:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(active_node_list)))
logger.out('{}Flushed hypervisors:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(flushed_node_list)))
logger.out('{}Inactive nodes:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(inactive_node_list)))
# Start keepalive thread and immediately update Zookeeper
startKeepaliveTimer()
update_zookeeper()
# Tick loop
# Tick loop; does nothing since everything else is async
while True:
try:
time.sleep(1)

View File

@ -27,7 +27,6 @@ import socket
import time
import libvirt
import threading
import subprocess
import pvcd.log as log
import pvcd.zkhandler as zkhandler
@ -421,169 +420,6 @@ class NodeInstance(object):
self.inflush = False
def update_zookeeper(self):
# Connect to libvirt
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return
# Get past state and update if needed
past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name))
if past_state != 'run':
self.daemon_state = 'run'
zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(self.name): 'run' })
else:
self.daemon_state = 'run'
# Ensure the primary key is properly set
if self.name == self.this_node:
if self.router_state == 'primary':
if zkhandler.readdata(self.zk_conn, '/primary_node') != self.name:
zkhandler.writedata(self.zk_conn, {'/primary_node': self.name})
# Toggle state management of dead VMs to restart them
memalloc = 0
vcpualloc = 0
for domain, instance in self.d_domain.items():
if domain in self.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.getnode() == self.name:
if instance.getdom() != None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
raise
except Exception as e:
# Toggle a state "change"
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() })
# Ensure that any running VMs are readded to the domain_list
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
for domain in running_domains:
domain_uuid = domain.UUIDString()
if domain_uuid not in self.domain_list:
self.domain_list.append(domain_uuid)
# Set our information in zookeeper
#self.name = lv_conn.getHostname()
self.memused = int(psutil.virtual_memory().used / 1024 / 1024)
self.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
self.memalloc = memalloc
self.vcpualloc = vcpualloc
self.cpuload = os.getloadavg()[0]
self.domains_count = len(lv_conn.listDomainsID())
keepalive_time = int(time.time())
try:
zkhandler.writedata(self.zk_conn, {
'/nodes/{}/memused'.format(self.name): str(self.memused),
'/nodes/{}/memfree'.format(self.name): str(self.memfree),
'/nodes/{}/memalloc'.format(self.name): str(self.memalloc),
'/nodes/{}/vcpualloc'.format(self.name): str(self.vcpualloc),
'/nodes/{}/cpuload'.format(self.name): str(self.cpuload),
'/nodes/{}/networkscount'.format(self.name): str(self.networks_count),
'/nodes/{}/domainscount'.format(self.name): str(self.domains_count),
'/nodes/{}/runningdomains'.format(self.name): ' '.join(self.domain_list),
'/nodes/{}/keepalive'.format(self.name): str(keepalive_time)
})
except:
self.logger.out('Failed to set keepalive data', state='e')
return
# Close the Libvirt connection
lv_conn.close()
# Update our local node lists
for node_name in self.d_node:
try:
node_daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name))
node_domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/keepalive'.format(node_name)))
except:
node_daemon_state = 'unknown'
node_domain_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - ( int(self.config['keepalive_interval']) * int(self.config['fence_intervals']) )
if node_keepalive < node_deadtime and node_daemon_state == 'run':
self.logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })
fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn, self.config, self.logger), kwargs={})
fence_thread.start()
# Update the arrays
if node_daemon_state == 'run' and node_domain_state != 'flushed' and node_name not in self.active_node_list:
self.active_node_list.append(node_name)
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_daemon_state != 'run' and node_domain_state != 'flushed' and node_name not in self.inactive_node_list:
self.inactive_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
if node_domain_state == 'flushed' and node_name not in self.flushed_node_list:
self.flushed_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
# List of the non-primary coordinators
secondary_node_list = self.config['coordinators'].split(',')
if secondary_node_list:
secondary_node_list.remove(self.primary_node)
for node in secondary_node_list:
if node in self.inactive_node_list:
secondary_node_list.remove(node)
# Display node information to the terminal
self.logger.out('{}{} keepalive{}'.format(self.logger.fmt_purple, self.name, self.logger.fmt_end), state='t')
self.logger.out(
'{bold}Domains:{nobold} {domcount} '
'{bold}Networks:{nobold} {netcount} '
'{bold}VM memory [MiB]:{nobold} {allocmem} '
'{bold}Free memory [MiB]:{nobold} {freemem} '
'{bold}Used memory [MiB]:{nobold} {usedmem} '
'{bold}Load:{nobold} {load}'.format(
bold=self.logger.fmt_bold,
nobold=self.logger.fmt_end,
domcount=self.domains_count,
freemem=self.memfree,
usedmem=self.memused,
load=self.cpuload,
allocmem=self.memalloc,
netcount=self.networks_count
),
)
# Display cluster information to the terminal
self.logger.out('{}Cluster status{}'.format(self.logger.fmt_purple, self.logger.fmt_end), state='t')
self.logger.out('{}Primary coordinator:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, self.primary_node))
self.logger.out('{}Secondary coordinators:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(secondary_node_list)))
self.logger.out('{}Active hypervisors:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.active_node_list)))
self.logger.out('{}Flushed hypervisors:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.flushed_node_list)))
self.logger.out('{}Inactive nodes:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.inactive_node_list)))
#
# Find a migration target
#
@ -681,79 +517,3 @@ def findTargetHypervisorVMs(zk_conn, dom_uuid):
target_node = node
return target_node
#
# Fence thread entry function
#
def fenceNode(node_name, zk_conn, config, logger):
failcount = 0
# We allow exactly 3 saving throws for the host to come back online
while failcount < 3:
# Wait 5 seconds
time.sleep(5)
# Get the state
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
# Is it still 'dead'
if node_daemon_state == 'dead':
failcount += 1
logger.out('Node "{}" failed {} saving throws'.format(node_name, failcount), state='w')
# It changed back to something else so it must be alive
else:
logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o')
return
logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='e')
# Get IPMI information
ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name))
ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name))
ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name))
# Shoot it in the head
fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger)
# Hold to ensure the fence takes effect
time.sleep(3)
# Force into secondary network state if needed
if node_name in config['coordinators'].split(','):
zkhandler.writedata(zk_conn, { '/nodes/{}/routerstate'.format(node_name): 'secondary' })
if zkhandler.readdata(zk_conn, '/primary_node') == node_name:
zkhandler.writedata(zk_conn, { '/primary_node': 'none' })
# If the fence succeeded and successful_fence is migrate
if fence_status == True and config['successful_fence'] == 'migrate':
migrateFromFencedNode(zk_conn, node_name, logger)
# If the fence failed and failed_fence is migrate
if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0':
migrateFromFencedNode(zk_conn, node_name, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zk_conn, node_name, logger):
logger.out('Moving VMs from dead node "{}" to new hosts'.format(node_name), state='i')
dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split()
for dom_uuid in dead_node_running_domains:
target_node = findTargetHypervisor(zk_conn, 'mem', dom_uuid)
logger.out('Moving VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
zkhandler.writedata(zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'start',
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): node_name
})
# Set node in flushed state for easy remigrating when it comes back
zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' })
#
# Perform an IPMI fence
#
def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
ipmi_command = ['/usr/bin/ipmitool', '-I', 'lanplus', '-H', ipmi_hostname, '-U', ipmi_user, '-P', ipmi_password, 'chassis', 'power', 'reset']
ipmi_command_output = subprocess.run(ipmi_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if ipmi_command_output.returncode == 0:
logger.out('Successfully rebooted dead node', state='o')
return True
else:
logger.out('Failed to reboot dead node', state='e')
return False

107
node-daemon/pvcd/fencing.py Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
# fencing.py - PVC daemon function library, node fencing functions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import threading
import time
import threading
import pvcd.zkhandler as zkhandler
import pvcd.common as common
#
# Fence thread entry function
#
def fenceNode(node_name, zk_conn, config, logger):
failcount = 0
# We allow exactly 3 saving throws for the host to come back online
while failcount < 3:
# Wait 5 seconds
time.sleep(5)
# Get the state
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
# Is it still 'dead'
if node_daemon_state == 'dead':
failcount += 1
logger.out('Node "{}" failed {} saving throws'.format(node_name, failcount), state='w')
# It changed back to something else so it must be alive
else:
logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o')
return
logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='w')
# Get IPMI information
ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name))
ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name))
ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name))
# Shoot it in the head
fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger)
# Hold to ensure the fence takes effect
time.sleep(3)
# Force into secondary network state if needed
if node_name in config['coordinators'].split(','):
zkhandler.writedata(zk_conn, { '/nodes/{}/routerstate'.format(node_name): 'secondary' })
if zkhandler.readdata(zk_conn, '/primary_node') == node_name:
zkhandler.writedata(zk_conn, { '/primary_node': 'none' })
# If the fence succeeded and successful_fence is migrate
if fence_status == True and config['successful_fence'] == 'migrate':
migrateFromFencedNode(zk_conn, node_name, logger)
# If the fence failed and failed_fence is migrate
if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0':
migrateFromFencedNode(zk_conn, node_name, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zk_conn, node_name, logger):
logger.out('Moving VMs from dead node "{}" to new hosts'.format(node_name), state='i')
dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split()
for dom_uuid in dead_node_running_domains:
target_node = findTargetHypervisor(zk_conn, 'mem', dom_uuid)
logger.out('Moving VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
zkhandler.writedata(zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'start',
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): node_name
})
# Set node in flushed state for easy remigrating when it comes back
zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' })
#
# Perform an IPMI fence
#
def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
ipmi_command = ['/usr/bin/ipmitool', '-I', 'lanplus', '-H', ipmi_hostname, '-U', ipmi_user, '-P', ipmi_password, 'chassis', 'power', 'reset']
ipmi_command = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power reset'.format(
ipmi_hostname, ipmi_user, ipmi_password
)
ipmi_command_retcode, ipmi_command_stdout, ipmi_command_stderr = common.run_os_command(ipmi_command)
if ipmi_command_retcode == 0:
logger.out('Successfully rebooted dead node', state='o')
return True
else:
logger.out('Failed to reboot dead node', state='e')
return False