Merge node common with daemon common

This commit is contained in:
Joshua Boniface 2021-06-01 12:17:25 -04:00
parent f73c433fc7
commit 9764090d6d
9 changed files with 146 additions and 337 deletions

View File

@ -22,45 +22,99 @@
import time import time
import uuid import uuid
import lxml import lxml
import shlex
import subprocess import subprocess
import signal
from json import loads from json import loads
from re import match as re_match from re import match as re_match
from distutils.util import strtobool from distutils.util import strtobool
from threading import Thread
from shlex import split as shlex_split
############################################################################### ###############################################################################
# Supplemental functions # Supplemental functions
############################################################################### ###############################################################################
#
# Run a local OS daemon in the background
#
class OSDaemon(object):
def __init__(self, command_string, environment, logfile):
command = shlex_split(command_string)
# Set stdout to be a logfile if set
if logfile:
stdout = open(logfile, 'a')
else:
stdout = subprocess.PIPE
# Invoke the process
self.proc = subprocess.Popen(
command,
env=environment,
stdout=stdout,
stderr=stdout,
)
# Signal the process
def signal(self, sent_signal):
signal_map = {
'hup': signal.SIGHUP,
'int': signal.SIGINT,
'term': signal.SIGTERM,
'kill': signal.SIGKILL
}
self.proc.send_signal(signal_map[sent_signal])
def run_os_daemon(command_string, environment=None, logfile=None):
daemon = OSDaemon(command_string, environment, logfile)
return daemon
# #
# Run a local OS command via shell # Run a local OS command via shell
# #
def run_os_command(command_string, background=False, environment=None, timeout=None, shell=False): def run_os_command(command_string, background=False, environment=None, timeout=None):
command = shlex.split(command_string) command = shlex_split(command_string)
try: if background:
command_output = subprocess.run( def runcmd():
command, try:
shell=shell, subprocess.run(
env=environment, command,
timeout=timeout, env=environment,
stdout=subprocess.PIPE, timeout=timeout,
stderr=subprocess.PIPE, stdout=subprocess.PIPE,
) stderr=subprocess.PIPE,
retcode = command_output.returncode )
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
retcode = 128 pass
thread = Thread(target=runcmd, args=())
thread.start()
return 0, None, None
else:
try:
command_output = subprocess.run(
command,
env=environment,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
retcode = command_output.returncode
except subprocess.TimeoutExpired:
retcode = 128
except Exception:
retcode = 255
try: try:
stdout = command_output.stdout.decode('ascii') stdout = command_output.stdout.decode('ascii')
except Exception: except Exception:
stdout = '' stdout = ''
try: try:
stderr = command_output.stderr.decode('ascii') stderr = command_output.stderr.decode('ascii')
except Exception: except Exception:
stderr = '' stderr = ''
return retcode, stdout, stderr return retcode, stdout, stderr
# #
@ -449,7 +503,9 @@ def findTargetNode(zkhandler, dom_uuid):
return None return None
#
# Get the list of valid target nodes # Get the list of valid target nodes
#
def getNodes(zkhandler, node_limit, dom_uuid): def getNodes(zkhandler, node_limit, dom_uuid):
valid_node_list = [] valid_node_list = []
full_node_list = zkhandler.children('/nodes') full_node_list = zkhandler.children('/nodes')
@ -473,7 +529,9 @@ def getNodes(zkhandler, node_limit, dom_uuid):
return valid_node_list return valid_node_list
#
# via free memory (relative to allocated memory) # via free memory (relative to allocated memory)
#
def findTargetNodeMem(zkhandler, node_limit, dom_uuid): def findTargetNodeMem(zkhandler, node_limit, dom_uuid):
most_provfree = 0 most_provfree = 0
target_node = None target_node = None
@ -493,7 +551,9 @@ def findTargetNodeMem(zkhandler, node_limit, dom_uuid):
return target_node return target_node
#
# via load average # via load average
#
def findTargetNodeLoad(zkhandler, node_limit, dom_uuid): def findTargetNodeLoad(zkhandler, node_limit, dom_uuid):
least_load = 9999.0 least_load = 9999.0
target_node = None target_node = None
@ -509,7 +569,9 @@ def findTargetNodeLoad(zkhandler, node_limit, dom_uuid):
return target_node return target_node
#
# via total vCPUs # via total vCPUs
#
def findTargetNodeVCPUs(zkhandler, node_limit, dom_uuid): def findTargetNodeVCPUs(zkhandler, node_limit, dom_uuid):
least_vcpus = 9999 least_vcpus = 9999
target_node = None target_node = None
@ -525,7 +587,9 @@ def findTargetNodeVCPUs(zkhandler, node_limit, dom_uuid):
return target_node return target_node
#
# via total VMs # via total VMs
#
def findTargetNodeVMs(zkhandler, node_limit, dom_uuid): def findTargetNodeVMs(zkhandler, node_limit, dom_uuid):
least_vms = 9999 least_vms = 9999
target_node = None target_node = None
@ -541,7 +605,9 @@ def findTargetNodeVMs(zkhandler, node_limit, dom_uuid):
return target_node return target_node
# Connect to the primary host and run a command #
# Connect to the primary node and run a command
#
def runRemoteCommand(node, command, become=False): def runRemoteCommand(node, command, become=False):
import paramiko import paramiko
import hashlib import hashlib
@ -571,3 +637,47 @@ def runRemoteCommand(node, command, become=False):
ssh_client.connect(node) ssh_client.connect(node)
stdin, stdout, stderr = ssh_client.exec_command(command) stdin, stdout, stderr = ssh_client.exec_command(command)
return stdout.read().decode('ascii').rstrip(), stderr.read().decode('ascii').rstrip() return stdout.read().decode('ascii').rstrip(), stderr.read().decode('ascii').rstrip()
#
# Reload the firewall rules of the system
#
def reload_firewall_rules(rules_file, logger=None):
if logger is not None:
logger.out('Reloading firewall configuration', state='o')
retcode, stdout, stderr = run_os_command('/usr/sbin/nft -f {}'.format(rules_file))
if retcode != 0 and logger is not None:
logger.out('Failed to reload configuration: {}'.format(stderr), state='e')
#
# Create an IP address
#
def createIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
'ip address add {}/{} dev {}'.format(
ipaddr,
cidrnetmask,
dev
)
)
run_os_command(
'arping -P -U -W 0.02 -c 2 -i {dev} -S {ip} {ip}'.format(
dev=dev,
ip=ipaddr
)
)
#
# Remove an IP address
#
def removeIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
'ip address delete {}/{} dev {}'.format(
ipaddr,
cidrnetmask,
dev
)
)

View File

@ -23,7 +23,7 @@ import time
import json import json
import psutil import psutil
import pvcnoded.common as common import daemon_lib.common as common
class CephOSDInstance(object): class CephOSDInstance(object):

View File

@ -26,7 +26,7 @@ import psycopg2
from threading import Thread, Event from threading import Thread, Event
import pvcnoded.common as common import daemon_lib.common as common
class DNSAggregatorInstance(object): class DNSAggregatorInstance(object):

View File

@ -44,7 +44,7 @@ from daemon_lib.zkhandler import ZKHandler
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.fencing as fencing import pvcnoded.fencing as fencing
import pvcnoded.common as common import daemon_lib.common as common
import pvcnoded.VMInstance as VMInstance import pvcnoded.VMInstance as VMInstance
import pvcnoded.NodeInstance as NodeInstance import pvcnoded.NodeInstance as NodeInstance
@ -782,7 +782,7 @@ if enable_networking:
nftables_base_filename = '{}/base.nft'.format(config['nft_dynamic_directory']) nftables_base_filename = '{}/base.nft'.format(config['nft_dynamic_directory'])
with open(nftables_base_filename, 'w') as nfbasefile: with open(nftables_base_filename, 'w') as nfbasefile:
nfbasefile.write(nftables_base_rules) nfbasefile.write(nftables_base_rules)
common.reload_firewall_rules(logger, nftables_base_filename) common.reload_firewall_rules(nftables_base_filename, logger=logger)
############################################################################### ###############################################################################
# PHASE 7d - Ensure DNSMASQ is not running # PHASE 7d - Ensure DNSMASQ is not running

View File

@ -23,7 +23,7 @@ import time
from threading import Thread from threading import Thread
import pvcnoded.common as common import daemon_lib.common as common
class NodeInstance(object): class NodeInstance(object):

View File

@ -28,7 +28,7 @@ from threading import Thread
from xml.etree import ElementTree from xml.etree import ElementTree
import pvcnoded.common as common import daemon_lib.common as common
import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance

View File

@ -24,7 +24,7 @@ import time
from textwrap import dedent from textwrap import dedent
import pvcnoded.common as common import daemon_lib.common as common
class VXNetworkInstance(object): class VXNetworkInstance(object):
@ -452,7 +452,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
# Reload firewall rules # Reload firewall rules
nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory']) nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory'])
common.reload_firewall_rules(self.logger, nftables_base_filename) common.reload_firewall_rules(nftables_base_filename, logger=self.logger)
# Create bridged network configuration # Create bridged network configuration
def createNetworkBridged(self): def createNetworkBridged(self):
@ -798,7 +798,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
# Reload firewall rules # Reload firewall rules
nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory']) nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory'])
common.reload_firewall_rules(self.logger, nftables_base_filename) common.reload_firewall_rules(nftables_base_filename, logger=self.logger)
def removeGateways(self): def removeGateways(self):
if self.nettype == 'managed': if self.nettype == 'managed':

View File

@ -1,301 +0,0 @@
#!/usr/bin/env python3
# common.py - PVC daemon function library, common fuctions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import subprocess
import signal
from threading import Thread
from shlex import split as shlex_split
class OSDaemon(object):
def __init__(self, command_string, environment, logfile):
command = shlex_split(command_string)
# Set stdout to be a logfile if set
if logfile:
stdout = open(logfile, 'a')
else:
stdout = subprocess.PIPE
# Invoke the process
self.proc = subprocess.Popen(
command,
env=environment,
stdout=stdout,
stderr=stdout,
)
# Signal the process
def signal(self, sent_signal):
signal_map = {
'hup': signal.SIGHUP,
'int': signal.SIGINT,
'term': signal.SIGTERM,
'kill': signal.SIGKILL
}
self.proc.send_signal(signal_map[sent_signal])
def run_os_daemon(command_string, environment=None, logfile=None):
daemon = OSDaemon(command_string, environment, logfile)
return daemon
# Run a oneshot command, optionally without blocking
def run_os_command(command_string, background=False, environment=None, timeout=None):
command = shlex_split(command_string)
if background:
def runcmd():
try:
subprocess.run(
command,
env=environment,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.TimeoutExpired:
pass
thread = Thread(target=runcmd, args=())
thread.start()
return 0, None, None
else:
try:
command_output = subprocess.run(
command,
env=environment,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
retcode = command_output.returncode
except subprocess.TimeoutExpired:
retcode = 128
except Exception:
retcode = 255
try:
stdout = command_output.stdout.decode('ascii')
except Exception:
stdout = ''
try:
stderr = command_output.stderr.decode('ascii')
except Exception:
stderr = ''
return retcode, stdout, stderr
# Reload the firewall rules of the system
def reload_firewall_rules(logger, rules_file):
logger.out('Reloading firewall configuration', state='o')
retcode, stdout, stderr = run_os_command('/usr/sbin/nft -f {}'.format(rules_file))
if retcode != 0:
logger.out('Failed to reload configuration: {}'.format(stderr), state='e')
# Create IP address
def createIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
'ip address add {}/{} dev {}'.format(
ipaddr,
cidrnetmask,
dev
)
)
run_os_command(
'arping -P -U -W 0.02 -c 2 -i {dev} -S {ip} {ip}'.format(
dev=dev,
ip=ipaddr
)
)
# Remove IP address
def removeIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
'ip address delete {}/{} dev {}'.format(
ipaddr,
cidrnetmask,
dev
)
)
#
# Find a migration target
#
def findTargetNode(zkhandler, config, logger, dom_uuid):
# Determine VM node limits; set config value if read fails
try:
node_limit = zkhandler.read('/domains/{}/node_limit'.format(dom_uuid)).split(',')
if not any(node_limit):
node_limit = ''
except Exception:
node_limit = ''
zkhandler.write([
('/domains/{}/node_limit'.format(dom_uuid), '')
])
# Determine VM search field
try:
search_field = zkhandler.read('/domains/{}/node_selector'.format(dom_uuid))
except Exception:
search_field = None
# If our search field is invalid, use the default
if search_field is None or search_field == 'None':
search_field = zkhandler.read('/config/migration_target_selector')
if config['debug']:
logger.out('Migrating VM {} with selector {}'.format(dom_uuid, search_field), state='d', prefix='node-flush')
# Execute the search
if search_field == 'mem':
return findTargetNodeMem(zkhandler, config, logger, node_limit, dom_uuid)
if search_field == 'load':
return findTargetNodeLoad(zkhandler, config, logger, node_limit, dom_uuid)
if search_field == 'vcpus':
return findTargetNodeVCPUs(zkhandler, config, logger, node_limit, dom_uuid)
if search_field == 'vms':
return findTargetNodeVMs(zkhandler, config, logger, node_limit, dom_uuid)
# Nothing was found
return None
# Get the list of valid target nodes
def getNodes(zkhandler, node_limit, dom_uuid):
valid_node_list = []
full_node_list = zkhandler.children('/nodes')
current_node = zkhandler.read('/domains/{}/node'.format(dom_uuid))
for node in full_node_list:
if node_limit and node not in node_limit:
continue
daemon_state = zkhandler.read('/nodes/{}/daemonstate'.format(node))
domain_state = zkhandler.read('/nodes/{}/domainstate'.format(node))
if node == current_node:
continue
if daemon_state != 'run' or domain_state != 'ready':
continue
valid_node_list.append(node)
return valid_node_list
# via free memory (relative to allocated memory)
def findTargetNodeMem(zkhandler, config, logger, node_limit, dom_uuid):
most_provfree = 0
target_node = None
node_list = getNodes(zkhandler, node_limit, dom_uuid)
if config['debug']:
logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush')
for node in node_list:
memprov = int(zkhandler.read('/nodes/{}/memprov'.format(node)))
memused = int(zkhandler.read('/nodes/{}/memused'.format(node)))
memfree = int(zkhandler.read('/nodes/{}/memfree'.format(node)))
memtotal = memused + memfree
provfree = memtotal - memprov
if config['debug']:
logger.out('Evaluating node {} with {} provfree'.format(node, provfree), state='d', prefix='node-flush')
if provfree > most_provfree:
most_provfree = provfree
target_node = node
if config['debug']:
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node
# via load average
def findTargetNodeLoad(zkhandler, config, logger, node_limit, dom_uuid):
least_load = 9999.0
target_node = None
node_list = getNodes(zkhandler, node_limit, dom_uuid)
if config['debug']:
logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush')
for node in node_list:
load = float(zkhandler.read('/nodes/{}/cpuload'.format(node)))
if config['debug']:
logger.out('Evaluating node {} with load {}'.format(node, load), state='d', prefix='node-flush')
if load < least_load:
least_load = load
target_node = node
if config['debug']:
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node
# via total vCPUs
def findTargetNodeVCPUs(zkhandler, config, logger, node_limit, dom_uuid):
least_vcpus = 9999
target_node = None
node_list = getNodes(zkhandler, node_limit, dom_uuid)
if config['debug']:
logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush')
for node in node_list:
vcpus = int(zkhandler.read('/nodes/{}/vcpualloc'.format(node)))
if config['debug']:
logger.out('Evaluating node {} with vcpualloc {}'.format(node, vcpus), state='d', prefix='node-flush')
if vcpus < least_vcpus:
least_vcpus = vcpus
target_node = node
if config['debug']:
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node
# via total VMs
def findTargetNodeVMs(zkhandler, config, logger, node_limit, dom_uuid):
least_vms = 9999
target_node = None
node_list = getNodes(zkhandler, node_limit, dom_uuid)
if config['debug']:
logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush')
for node in node_list:
vms = int(zkhandler.read('/nodes/{}/domainscount'.format(node)))
if config['debug']:
logger.out('Evaluating node {} with VM count {}'.format(node, vms), state='d', prefix='node-flush')
if vms < least_vms:
least_vms = vms
target_node = node
if config['debug']:
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node

View File

@ -21,7 +21,7 @@
import time import time
import pvcnoded.common as common import daemon_lib.common as common
import pvcnoded.VMInstance as VMInstance import pvcnoded.VMInstance as VMInstance