#!/usr/bin/env python3 # Daemon.py - PVC hypervisor router daemon # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018 Joshua M. Boniface # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # ############################################################################### import kazoo.client import sys import os import socket import psutil import subprocess import time import configparser import signal import atexit import apscheduler.schedulers.background import daemon_lib.ansiiprint as ansiiprint import daemon_lib.zkhandler as zkhandler import daemon_lib.common as common import pvcrd.RouterInstance as RouterInstance import pvcrd.VXNetworkInstance as VXNetworkInstance print(ansiiprint.bold() + "pvcrd - Parallel Virtual Cluster router daemon" + ansiiprint.end()) # Set sysctl to enable routing before we do anything else common.run_os_command('sysctl net.ipv4.ip_forward=1') common.run_os_command('sysctl net.ipv4.conf.all.send_redirects=1') common.run_os_command('sysctl net.ipv4.conf.all.rp_filter=0') common.run_os_command('sysctl net.ipv4.conf.default.rp_filter=0') common.run_os_command('sysctl net.ipv4.conf.all.accept_source_route=1') common.run_os_command('sysctl net.ipv4.conf.all.accept_source_route=1') common.run_os_command('sysctl net.ipv6.ip_forward=1') common.run_os_command('sysctl net.ipv6.conf.all.rp_filter=0') common.run_os_command('sysctl net.ipv6.conf.default.rp_filter=0') common.run_os_command('sysctl net.ipv6.conf.all.send_redirects=1') common.run_os_command('sysctl net.ipv6.conf.all.accept_source_route=1') # Get the config file variable from the environment try: pvcrd_config_file = os.environ['PVCRD_CONFIG_FILE'] except: print('ERROR: The "PVCRD_CONFIG_FILE" environment variable must be set before starting pvcrd.') exit(1) myhostname = socket.gethostname() myshorthostname = myhostname.split('.', 1)[0] mynetworkname = ''.join(myhostname.split('.', 1)[1:]) # Config values dictionary config_values = [ 'zookeeper', 'keepalive_interval', 'keepalive_interval', 'fence_intervals', 'vni_dev', 'vni_dev_ip', 'ipmi_hostname', 'ipmi_username', 'ipmi_password' ] def readConfig(pvcrd_config_file, myhostname): print('Loading configuration from file {}'.format(pvcrd_config_file)) o_config = configparser.ConfigParser() o_config.read(pvcrd_config_file) config = {} config['pvcrd_config_file'] = pvcrd_config_file try: entries = o_config[myhostname] except: try: entries = o_config['default'] except Exception as e: print('ERROR: Config file is not valid!') exit(1) for entry in config_values: try: config[entry] = entries[entry] except: try: config[entry] = o_config['default'][entry] except: print('ERROR: Config file missing required value "{}" for this host!'.format(entry)) exit(1) # Handle an empty ipmi_hostname if config['ipmi_hostname'] == '': config['ipmi_hostname'] = myshorthostname + '-lom.' + mynetworkname return config # Get config config = readConfig(pvcrd_config_file, myhostname) # Add some static config elements config['nftables_rules_dir'] = '/var/lib/pvc/nftables' config['dnsmasq_hosts_dir'] = '/var/lib/pvc/dnsmasq' # Set up our VNI interface vni_dev = config['vni_dev'] vni_dev_ip = config['vni_dev_ip'] print('Setting up VNI interface {} with IP {}'.format(vni_dev, vni_dev_ip)) common.run_os_command('ip link set {} up'.format(vni_dev)) common.run_os_command('ip address add {} dev {}'.format(vni_dev_ip, vni_dev)) # Connect to local zookeeper zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper']) try: print('Connecting to Zookeeper instance at {}'.format(config['zookeeper'])) zk_conn.start() except: print('ERROR: Failed to connect to Zookeeper') exit(1) # Handle zookeeper failures def zk_listener(state): global zk_conn, update_timer if state == kazoo.client.KazooState.SUSPENDED: ansiiprint.echo('Connection to Zookeeper lost; retrying', '', 'e') # Stop keepalive thread stopKeepaliveTimer(update_timer) while True: _zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper']) try: _zk_conn.start() zk_conn = _zk_conn break except: time.sleep(1) elif state == kazoo.client.KazooState.CONNECTED: ansiiprint.echo('Connection to Zookeeper started', '', 'o') # Start keepalive thread update_timer = createKeepaliveTimer() else: pass zk_conn.add_listener(zk_listener) # Cleanup function def cleanup(): ansiiprint.echo('Cleaning up', '', 'e') # Stop keepalive thread stopKeepaliveTimer(update_timer) # Set stop state in Zookeeper zkhandler.writedata(zk_conn, {'/routers/{}/daemonstate'.format(myhostname): 'stop'}) if this_router.name == this_router.primary_router: zkhandler.writedata(zk_conn, {'/routers': 'none'}) # Wait for everything to flush time.sleep(3) # Close the Zookeeper connection try: zk_conn.stop() zk_conn.close() except: pass ansiiprint.echo('Terminating daemon', '', 'e') atexit.register(cleanup) # Gather useful data about our host for staticdata # Static data format: 'cpu_count', 'arch', 'os', 'kernel' staticdata = [] staticdata.append(str(psutil.cpu_count())) staticdata.append(subprocess.run(['uname', '-r'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) staticdata.append(subprocess.run(['uname', '-o'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) # Print static data on start print('{0}Router hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), myhostname)) print('{0}IPMI hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), config['ipmi_hostname'])) print('{0}Machine details:{1}'.format(ansiiprint.bold(), ansiiprint.end())) print(' {0}CPUs:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[0])) print(' {0}Arch:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[3])) print(' {0}OS:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[2])) print(' {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[1])) # Check if our router exists in Zookeeper, and create it if not if zk_conn.exists('/routers/{}'.format(myhostname)): print("Router is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper") # Update static data just in case it's changed zkhandler.writedata(zk_conn, { '/routers/{}/staticdata'.format(myhostname): ' '.join(staticdata) }) else: print("Router is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new router") keepalive_time = int(time.time()) transaction = zk_conn.transaction() transaction.create('/routers/{}'.format(myhostname), 'hypervisor'.encode('ascii')) # Basic state information transaction.create('/routers/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii')) transaction.create('/routers/{}/networkstate'.format(myhostname), 'secondary'.encode('ascii')) transaction.create('/routers/{}/staticdata'.format(myhostname), ' '.join(staticdata).encode('ascii')) transaction.create('/routers/{}/cpuload'.format(myhostname), '0'.encode('ascii')) # Keepalives and fencing information transaction.create('/routers/{}/keepalive'.format(myhostname), str(keepalive_time).encode('ascii')) transaction.create('/routers/{}/ipmihostname'.format(myhostname), config['ipmi_hostname'].encode('ascii')) transaction.create('/routers/{}/ipmiusername'.format(myhostname), config['ipmi_username'].encode('ascii')) transaction.create('/routers/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii')) transaction.commit() # Check that the primary key exists, and create it with us as master if not current_primary = zkhandler.readdata(zk_conn, '/routers') if current_primary: print('Current primary router is {}"{}"{}.'.format(ansiiprint.blue(), current_primary, ansiiprint.end())) else: print('No primary router key found; creating with us as primary.') zkhandler.writedata(zk_conn, { '/routers': myhostname }) zkhandler.writedata(zk_conn, { '/routers/{}/daemonstate'.format(myhostname): 'init' }) t_router = dict() s_network = dict() router_list = [] network_list = [] # Create our config dirs common.run_os_command( '/bin/mkdir --parents {}/networks'.format( config['nftables_rules_dir'] ) ) common.run_os_command( '/bin/mkdir --parents {}/static'.format( config['nftables_rules_dir'] ) ) common.run_os_command( '/bin/mkdir --parents {}'.format( config['dnsmasq_hosts_dir'] ) ) # Set up the basic features of the nftables firewall nftables_base_rules = """# Base rules flush ruleset # Add the filter table and chains add table inet filter add chain inet filter forward {{ type filter hook forward priority 0; }} add chain inet filter input {{ type filter hook input priority 0; }} # Include static rules and network rules include "{rulesdir}/static/*" include "{rulesdir}/networks/*" """.format( rulesdir=config['nftables_rules_dir'] ) # Write the basic firewall config print(nftables_base_rules) nftables_base_filename = '{}/base.nft'.format(config['nftables_rules_dir']) nftables_update_filename = '{}/update'.format(config['nftables_rules_dir']) with open(nftables_base_filename, 'w') as nfbasefile: nfbasefile.write(nftables_base_rules) open(nftables_update_filename, 'a').close() # # Router instances # @zk_conn.ChildrenWatch('/routers') def updaterouters(new_router_list): global router_list router_list = new_router_list print(ansiiprint.blue() + 'Router list: ' + ansiiprint.end() + '{}'.format(' '.join(router_list))) for router in router_list: if router in t_router: t_router[router].updaterouterlist(t_router) else: t_router[router] = RouterInstance.RouterInstance(myhostname, router, t_router, s_network, zk_conn, config) # Set up our update function this_router = t_router[myhostname] update_zookeeper = this_router.update_zookeeper # # Network instances # @zk_conn.ChildrenWatch('/networks') def updatenetworks(new_network_list): global network_list for network in new_network_list: if not network in s_network: s_network[network] = VXNetworkInstance.VXNetworkInstance(network, zk_conn, config, t_router[myhostname]) if this_router.network_state == 'primary': s_network[network].createGatewayAddress() s_network[network].startDHCPServer() for network in network_list: if not network in new_network_list: if this_router.network_state == 'primary': s_network[network].stopDHCPServer() s_network[network].removeGatewayAddress() s_network[network].removeFirewall() s_network[network].removeNetwork() del(s_network[network]) network_list = new_network_list for router in router_list: if router in t_router: t_router[router].updatenetworklist(s_network) print(ansiiprint.blue() + 'Network list: ' + ansiiprint.end() + '{}'.format(' '.join(network_list))) # Create timer to update this router in Zookeeper def createKeepaliveTimer(): interval = int(config['keepalive_interval']) ansiiprint.echo('Starting keepalive timer ({} second interval)'.format(interval), '', 'o') update_timer = apscheduler.schedulers.background.BackgroundScheduler() update_timer.add_job(update_zookeeper, 'interval', seconds=interval) update_timer.start() return update_timer def stopKeepaliveTimer(update_timer): ansiiprint.echo('Stopping keepalive timer', '', 'c') update_timer.shutdown() # Start keepalive thread update_timer = createKeepaliveTimer() update_zookeeper() # Tick loop while True: try: time.sleep(0.5) except: break