diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index 7cab9dfc..275e8372 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -20,6 +20,7 @@ # ############################################################################### +# Version string for startup output version = '0.4' import kazoo.client @@ -35,10 +36,12 @@ import uuid import time import re import configparser +import threading import apscheduler.schedulers.background import pvcd.log as log import pvcd.zkhandler as zkhandler +import pvcd.fencing as fencing import pvcd.common as common import pvcd.DomainInstance as DomainInstance @@ -319,15 +322,18 @@ def cleanup(): logger.out('Terminating pvcd and cleaning up', state='s') - # Set stop state in Zookeeper - zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' }) - # Force into secondary network state if needed if this_node.name == this_node.primary_node: zkhandler.writedata(zk_conn, { '/primary_node': 'none' }) # Wait for things to flush - time.sleep(3) + time.sleep(2) + + # Set stop state in Zookeeper + zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' }) + + # Forcibly terminate dnsmasq because it gets stuck sometimes + common.run_os_command('killall dnsmasq') # Close the Zookeeper connection try: @@ -631,14 +637,154 @@ def update_domains(new_domain_list): # PHASE 9 - Run the daemon ############################################################################### -# Set up our update function -update_zookeeper = this_node.update_zookeeper +# Zookeeper keepalive update function +def update_zookeeper(): + # Get past state and update if needed + past_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(this_node.name)) + if past_state != 'run': + this_node.daemon_state = 'run' + zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(this_node.name): 'run' }) + else: + this_node.daemon_state = 'run' + + # Ensure the primary key is properly set + if this_node.router_state == 'primary': + if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: + zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) + + # Toggle state management of dead VMs to restart them + memalloc = 0 + vcpualloc = 0 + for domain, instance in this_node.d_domain.items(): + if domain in this_node.domain_list: + # Add the allocated memory to our memalloc value + memalloc += instance.getmemory() + vcpualloc += instance.getvcpus() + if instance.getstate() == 'start' and instance.getnode() == this_node.name: + if instance.getdom() != None: + try: + if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: + raise + except Exception as e: + # Toggle a state "change" + zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) + + # Connect to libvirt + libvirt_name = "qemu:///system" + lv_conn = libvirt.open(libvirt_name) + if lv_conn == None: + logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') + return + + # Ensure that any running VMs are readded to the domain_list + running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) + for domain in running_domains: + domain_uuid = domain.UUIDString() + if domain_uuid not in this_node.domain_list: + this_node.domain_list.append(domain_uuid) + + # Set our information in zookeeper + #this_node.name = lv_conn.getHostname() + this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) + this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) + this_node.memalloc = memalloc + this_node.vcpualloc = vcpualloc + this_node.cpuload = os.getloadavg()[0] + this_node.domains_count = len(lv_conn.listDomainsID()) + keepalive_time = int(time.time()) + try: + zkhandler.writedata(zk_conn, { + '/nodes/{}/memused'.format(this_node.name): str(this_node.memused), + '/nodes/{}/memfree'.format(this_node.name): str(this_node.memfree), + '/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc), + '/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc), + '/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload), + '/nodes/{}/networkscount'.format(this_node.name): str(this_node.networks_count), + '/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count), + '/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list), + '/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time) + }) + except: + logger.out('Failed to set keepalive data', state='e') + return + + # Close the Libvirt connection + lv_conn.close() + + # Update our local node lists + flushed_node_list = [] + active_node_list = [] + inactive_node_list = [] + for node_name in d_node: + try: + node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) + node_domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node_name)) + node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name))) + except: + node_daemon_state = 'unknown' + node_domain_state = 'unknown' + node_keepalive = 0 + + # Handle deadtime and fencng if needed + # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds + # out-of-date while in 'start' state) + node_deadtime = int(time.time()) - ( int(config['keepalive_interval']) * int(config['fence_intervals']) ) + if node_keepalive < node_deadtime and node_daemon_state == 'run': + logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') + zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) + fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={}) + fence_thread.start() + + # Update the arrays + if node_domain_state == 'flushed': + flushed_node_list.append(node_name) + else: + if node_daemon_state == 'run': + active_node_list.append(node_name) + else: + inactive_node_list.append(node_name) + + # List of the non-primary coordinators + secondary_node_list = this_node.config['coordinators'].split(',') + if secondary_node_list: + secondary_node_list.remove(this_node.primary_node) + for node in secondary_node_list: + if node in inactive_node_list: + secondary_node_list.remove(node) + + # Display node information to the terminal + logger.out('{}{} keepalive{}'.format(logger.fmt_purple, this_node.name, logger.fmt_end), state='t') + logger.out( + '{bold}Domains:{nobold} {domcount} ' + '{bold}Networks:{nobold} {netcount} ' + '{bold}VM memory [MiB]:{nobold} {allocmem} ' + '{bold}Free memory [MiB]:{nobold} {freemem} ' + '{bold}Used memory [MiB]:{nobold} {usedmem} ' + '{bold}Load:{nobold} {load}'.format( + bold=logger.fmt_bold, + nobold=logger.fmt_end, + domcount=this_node.domains_count, + freemem=this_node.memfree, + usedmem=this_node.memused, + load=this_node.cpuload, + allocmem=this_node.memalloc, + netcount=this_node.networks_count + ), + ) + + # Display cluster information to the terminal + logger.out('{}Cluster status{}'.format(logger.fmt_purple, logger.fmt_end), state='t') + logger.out('{}Primary coordinator:{} {}'.format(logger.fmt_bold, logger.fmt_end, this_node.primary_node)) + logger.out('{}Secondary coordinators:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(secondary_node_list))) + logger.out('{}Active hypervisors:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(active_node_list))) + logger.out('{}Flushed hypervisors:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(flushed_node_list))) + logger.out('{}Inactive nodes:{} {}'.format(logger.fmt_bold, logger.fmt_end, ' '.join(inactive_node_list))) # Start keepalive thread and immediately update Zookeeper startKeepaliveTimer() update_zookeeper() -# Tick loop +# Tick loop; does nothing since everything else is async while True: try: time.sleep(1) diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index ed8d0082..508ec16f 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -27,7 +27,6 @@ import socket import time import libvirt import threading -import subprocess import pvcd.log as log import pvcd.zkhandler as zkhandler @@ -421,169 +420,6 @@ class NodeInstance(object): self.inflush = False - def update_zookeeper(self): - # Connect to libvirt - libvirt_name = "qemu:///system" - lv_conn = libvirt.open(libvirt_name) - if lv_conn == None: - self.logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') - return - - # Get past state and update if needed - past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name)) - if past_state != 'run': - self.daemon_state = 'run' - zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(self.name): 'run' }) - else: - self.daemon_state = 'run' - - # Ensure the primary key is properly set - if self.name == self.this_node: - if self.router_state == 'primary': - if zkhandler.readdata(self.zk_conn, '/primary_node') != self.name: - zkhandler.writedata(self.zk_conn, {'/primary_node': self.name}) - - # Toggle state management of dead VMs to restart them - memalloc = 0 - vcpualloc = 0 - for domain, instance in self.d_domain.items(): - if domain in self.domain_list: - # Add the allocated memory to our memalloc value - memalloc += instance.getmemory() - vcpualloc += instance.getvcpus() - if instance.getstate() == 'start' and instance.getnode() == self.name: - if instance.getdom() != None: - try: - if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: - raise - except Exception as e: - # Toggle a state "change" - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) - - # Ensure that any running VMs are readded to the domain_list - running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) - for domain in running_domains: - domain_uuid = domain.UUIDString() - if domain_uuid not in self.domain_list: - self.domain_list.append(domain_uuid) - - # Set our information in zookeeper - #self.name = lv_conn.getHostname() - self.memused = int(psutil.virtual_memory().used / 1024 / 1024) - self.memfree = int(psutil.virtual_memory().free / 1024 / 1024) - self.memalloc = memalloc - self.vcpualloc = vcpualloc - self.cpuload = os.getloadavg()[0] - self.domains_count = len(lv_conn.listDomainsID()) - keepalive_time = int(time.time()) - try: - zkhandler.writedata(self.zk_conn, { - '/nodes/{}/memused'.format(self.name): str(self.memused), - '/nodes/{}/memfree'.format(self.name): str(self.memfree), - '/nodes/{}/memalloc'.format(self.name): str(self.memalloc), - '/nodes/{}/vcpualloc'.format(self.name): str(self.vcpualloc), - '/nodes/{}/cpuload'.format(self.name): str(self.cpuload), - '/nodes/{}/networkscount'.format(self.name): str(self.networks_count), - '/nodes/{}/domainscount'.format(self.name): str(self.domains_count), - '/nodes/{}/runningdomains'.format(self.name): ' '.join(self.domain_list), - '/nodes/{}/keepalive'.format(self.name): str(keepalive_time) - }) - except: - self.logger.out('Failed to set keepalive data', state='e') - return - - # Close the Libvirt connection - lv_conn.close() - - # Update our local node lists - for node_name in self.d_node: - try: - node_daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name)) - node_domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name)) - node_keepalive = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/keepalive'.format(node_name))) - except: - node_daemon_state = 'unknown' - node_domain_state = 'unknown' - node_keepalive = 0 - - # Handle deadtime and fencng if needed - # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds - # out-of-date while in 'start' state) - node_deadtime = int(time.time()) - ( int(self.config['keepalive_interval']) * int(self.config['fence_intervals']) ) - if node_keepalive < node_deadtime and node_daemon_state == 'run': - self.logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') - zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) - fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn, self.config, self.logger), kwargs={}) - fence_thread.start() - - # Update the arrays - if node_daemon_state == 'run' and node_domain_state != 'flushed' and node_name not in self.active_node_list: - self.active_node_list.append(node_name) - try: - self.flushed_node_list.remove(node_name) - except ValueError: - pass - try: - self.inactive_node_list.remove(node_name) - except ValueError: - pass - if node_daemon_state != 'run' and node_domain_state != 'flushed' and node_name not in self.inactive_node_list: - self.inactive_node_list.append(node_name) - try: - self.active_node_list.remove(node_name) - except ValueError: - pass - try: - self.flushed_node_list.remove(node_name) - except ValueError: - pass - if node_domain_state == 'flushed' and node_name not in self.flushed_node_list: - self.flushed_node_list.append(node_name) - try: - self.active_node_list.remove(node_name) - except ValueError: - pass - try: - self.inactive_node_list.remove(node_name) - except ValueError: - pass - - # List of the non-primary coordinators - secondary_node_list = self.config['coordinators'].split(',') - if secondary_node_list: - secondary_node_list.remove(self.primary_node) - for node in secondary_node_list: - if node in self.inactive_node_list: - secondary_node_list.remove(node) - - # Display node information to the terminal - self.logger.out('{}{} keepalive{}'.format(self.logger.fmt_purple, self.name, self.logger.fmt_end), state='t') - self.logger.out( - '{bold}Domains:{nobold} {domcount} ' - '{bold}Networks:{nobold} {netcount} ' - '{bold}VM memory [MiB]:{nobold} {allocmem} ' - '{bold}Free memory [MiB]:{nobold} {freemem} ' - '{bold}Used memory [MiB]:{nobold} {usedmem} ' - '{bold}Load:{nobold} {load}'.format( - bold=self.logger.fmt_bold, - nobold=self.logger.fmt_end, - domcount=self.domains_count, - freemem=self.memfree, - usedmem=self.memused, - load=self.cpuload, - allocmem=self.memalloc, - netcount=self.networks_count - ), - ) - - # Display cluster information to the terminal - self.logger.out('{}Cluster status{}'.format(self.logger.fmt_purple, self.logger.fmt_end), state='t') - self.logger.out('{}Primary coordinator:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, self.primary_node)) - self.logger.out('{}Secondary coordinators:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(secondary_node_list))) - self.logger.out('{}Active hypervisors:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.active_node_list))) - self.logger.out('{}Flushed hypervisors:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.flushed_node_list))) - self.logger.out('{}Inactive nodes:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.inactive_node_list))) - # # Find a migration target # @@ -681,79 +517,3 @@ def findTargetHypervisorVMs(zk_conn, dom_uuid): target_node = node return target_node - - -# -# Fence thread entry function -# -def fenceNode(node_name, zk_conn, config, logger): - failcount = 0 - # We allow exactly 3 saving throws for the host to come back online - while failcount < 3: - # Wait 5 seconds - time.sleep(5) - # Get the state - node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) - # Is it still 'dead' - if node_daemon_state == 'dead': - failcount += 1 - logger.out('Node "{}" failed {} saving throws'.format(node_name, failcount), state='w') - # It changed back to something else so it must be alive - else: - logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o') - return - - logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='e') - - # Get IPMI information - ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name)) - ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name)) - ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name)) - - # Shoot it in the head - fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger) - # Hold to ensure the fence takes effect - time.sleep(3) - - # Force into secondary network state if needed - if node_name in config['coordinators'].split(','): - zkhandler.writedata(zk_conn, { '/nodes/{}/routerstate'.format(node_name): 'secondary' }) - if zkhandler.readdata(zk_conn, '/primary_node') == node_name: - zkhandler.writedata(zk_conn, { '/primary_node': 'none' }) - - # If the fence succeeded and successful_fence is migrate - if fence_status == True and config['successful_fence'] == 'migrate': - migrateFromFencedNode(zk_conn, node_name, logger) - # If the fence failed and failed_fence is migrate - if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0': - migrateFromFencedNode(zk_conn, node_name, logger) - -# Migrate hosts away from a fenced node -def migrateFromFencedNode(zk_conn, node_name, logger): - logger.out('Moving VMs from dead node "{}" to new hosts'.format(node_name), state='i') - dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split() - for dom_uuid in dead_node_running_domains: - target_node = findTargetHypervisor(zk_conn, 'mem', dom_uuid) - - logger.out('Moving VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i') - zkhandler.writedata(zk_conn, { - '/domains/{}/state'.format(dom_uuid): 'start', - '/domains/{}/node'.format(dom_uuid): target_node, - '/domains/{}/lastnode'.format(dom_uuid): node_name - }) - - # Set node in flushed state for easy remigrating when it comes back - zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' }) - -# -# Perform an IPMI fence -# -def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger): - ipmi_command = ['/usr/bin/ipmitool', '-I', 'lanplus', '-H', ipmi_hostname, '-U', ipmi_user, '-P', ipmi_password, 'chassis', 'power', 'reset'] - ipmi_command_output = subprocess.run(ipmi_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if ipmi_command_output.returncode == 0: - logger.out('Successfully rebooted dead node', state='o') - return True - else: - logger.out('Failed to reboot dead node', state='e') - return False diff --git a/node-daemon/pvcd/fencing.py b/node-daemon/pvcd/fencing.py new file mode 100644 index 00000000..4f0201a7 --- /dev/null +++ b/node-daemon/pvcd/fencing.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 + +# fencing.py - PVC daemon function library, node fencing functions +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import threading +import time +import threading + +import pvcd.zkhandler as zkhandler +import pvcd.common as common + +# +# Fence thread entry function +# +def fenceNode(node_name, zk_conn, config, logger): + failcount = 0 + # We allow exactly 3 saving throws for the host to come back online + while failcount < 3: + # Wait 5 seconds + time.sleep(5) + # Get the state + node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) + # Is it still 'dead' + if node_daemon_state == 'dead': + failcount += 1 + logger.out('Node "{}" failed {} saving throws'.format(node_name, failcount), state='w') + # It changed back to something else so it must be alive + else: + logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o') + return + + logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='w') + + # Get IPMI information + ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name)) + ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name)) + ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name)) + + # Shoot it in the head + fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger) + # Hold to ensure the fence takes effect + time.sleep(3) + + # Force into secondary network state if needed + if node_name in config['coordinators'].split(','): + zkhandler.writedata(zk_conn, { '/nodes/{}/routerstate'.format(node_name): 'secondary' }) + if zkhandler.readdata(zk_conn, '/primary_node') == node_name: + zkhandler.writedata(zk_conn, { '/primary_node': 'none' }) + + # If the fence succeeded and successful_fence is migrate + if fence_status == True and config['successful_fence'] == 'migrate': + migrateFromFencedNode(zk_conn, node_name, logger) + + # If the fence failed and failed_fence is migrate + if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0': + migrateFromFencedNode(zk_conn, node_name, logger) + +# Migrate hosts away from a fenced node +def migrateFromFencedNode(zk_conn, node_name, logger): + logger.out('Moving VMs from dead node "{}" to new hosts'.format(node_name), state='i') + dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split() + for dom_uuid in dead_node_running_domains: + target_node = findTargetHypervisor(zk_conn, 'mem', dom_uuid) + + logger.out('Moving VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i') + zkhandler.writedata(zk_conn, { + '/domains/{}/state'.format(dom_uuid): 'start', + '/domains/{}/node'.format(dom_uuid): target_node, + '/domains/{}/lastnode'.format(dom_uuid): node_name + }) + + # Set node in flushed state for easy remigrating when it comes back + zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' }) + +# +# Perform an IPMI fence +# +def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger): + ipmi_command = ['/usr/bin/ipmitool', '-I', 'lanplus', '-H', ipmi_hostname, '-U', ipmi_user, '-P', ipmi_password, 'chassis', 'power', 'reset'] + ipmi_command = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power reset'.format( + ipmi_hostname, ipmi_user, ipmi_password + ) + ipmi_command_retcode, ipmi_command_stdout, ipmi_command_stderr = common.run_os_command(ipmi_command) + if ipmi_command_retcode == 0: + logger.out('Successfully rebooted dead node', state='o') + return True + else: + logger.out('Failed to reboot dead node', state='e') + return False