Clean up imports

Make several imports more specific to reduce redundant code imports and
improve memory utilization.
This commit is contained in:
Joshua Boniface 2020-08-11 11:46:41 -04:00
parent 46ffe352e3
commit 0e5c681ada
11 changed files with 37 additions and 55 deletions

View File

@ -21,7 +21,6 @@
############################################################################### ###############################################################################
import time import time
import ast
import json import json
import psutil import psutil

View File

@ -20,14 +20,13 @@
# #
############################################################################### ###############################################################################
import os
import sys
import time import time
import threading
import dns.zone import dns.zone
import dns.query import dns.query
import psycopg2 import psycopg2
from threading import Thread, Event
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common import pvcnoded.common as common
@ -292,7 +291,7 @@ class AXFRDaemonInstance(object):
self.config = self.aggregator.config self.config = self.aggregator.config
self.logger = self.aggregator.logger self.logger = self.aggregator.logger
self.dns_networks = self.aggregator.dns_networks self.dns_networks = self.aggregator.dns_networks
self.thread_stopper = threading.Event() self.thread_stopper = Event()
self.thread = None self.thread = None
self.sql_conn = None self.sql_conn = None
@ -302,7 +301,7 @@ class AXFRDaemonInstance(object):
def start(self): def start(self):
# Create the thread # Create the thread
self.thread_stopper.clear() self.thread_stopper.clear()
self.thread = threading.Thread(target=self.run, args=(), kwargs={}) self.thread = Thread(target=self.run, args=(), kwargs={})
# Start a local instance of the SQL connection # Start a local instance of the SQL connection
# Trying to use the instance from the main DNS Aggregator can result in connection failures # Trying to use the instance from the main DNS Aggregator can result in connection failures

View File

@ -28,20 +28,17 @@ import libvirt
import sys import sys
import os import os
import signal import signal
import atexit
import socket
import psutil import psutil
import subprocess import subprocess
import uuid
import time import time
import re import re
import configparser
import threading
import yaml import yaml
import json import json
import ipaddress
import apscheduler.schedulers.background
from socket import gethostname
from threading import Thread
from ipaddress import ip_address, ip_network
from apscheduler.schedulers.background import BackgroundScheduler
from distutils.util import strtobool from distutils.util import strtobool
from queue import Queue from queue import Queue
from xml.etree import ElementTree from xml.etree import ElementTree
@ -80,7 +77,7 @@ import pvcnoded.MetadataAPIInstance as MetadataAPIInstance
# Create timer to update this node in Zookeeper # Create timer to update this node in Zookeeper
def startKeepaliveTimer(): def startKeepaliveTimer():
# Create our timer object # Create our timer object
update_timer = apscheduler.schedulers.background.BackgroundScheduler() update_timer = BackgroundScheduler()
interval = int(config['keepalive_interval']) interval = int(config['keepalive_interval'])
logger.out('Starting keepalive timer ({} second interval)'.format(interval), state='s') logger.out('Starting keepalive timer ({} second interval)'.format(interval), state='s')
update_timer.add_job(node_keepalive, 'interval', seconds=interval) update_timer.add_job(node_keepalive, 'interval', seconds=interval)
@ -108,7 +105,7 @@ except:
exit(1) exit(1)
# Set local hostname and domain variables # Set local hostname and domain variables
myfqdn = socket.gethostname() myfqdn = gethostname()
#myfqdn = 'pvc-hv1.domain.net' #myfqdn = 'pvc-hv1.domain.net'
myhostname = myfqdn.split('.', 1)[0] myhostname = myfqdn.split('.', 1)[0]
mydomainname = ''.join(myfqdn.split('.', 1)[1:]) mydomainname = ''.join(myfqdn.split('.', 1)[1:])
@ -234,7 +231,7 @@ def readConfig(pvcnoded_config_file, myhostname):
# Verify the network provided is valid # Verify the network provided is valid
try: try:
network = ipaddress.ip_network(config[network_key]) network = ip_network(config[network_key])
except Exception as e: except Exception as e:
print('ERROR: Network address {} for {} is not valid!'.format(config[network_key], network_key)) print('ERROR: Network address {} for {} is not valid!'.format(config[network_key], network_key))
exit(1) exit(1)
@ -251,7 +248,7 @@ def readConfig(pvcnoded_config_file, myhostname):
try: try:
# Set the ipaddr # Set the ipaddr
floating_addr = ipaddress.ip_address(config[floating_key].split('/')[0]) floating_addr = ip_address(config[floating_key].split('/')[0])
# Verify we're in the network # Verify we're in the network
if not floating_addr in list(network.hosts()): if not floating_addr in list(network.hosts()):
raise raise
@ -1446,13 +1443,13 @@ def node_keepalive():
# Run VM statistics collection in separate thread for parallelization # Run VM statistics collection in separate thread for parallelization
if enable_hypervisor: if enable_hypervisor:
vm_thread_queue = Queue() vm_thread_queue = Queue()
vm_stats_thread = threading.Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={}) vm_stats_thread = Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={})
vm_stats_thread.start() vm_stats_thread.start()
# Run Ceph status collection in separate thread for parallelization # Run Ceph status collection in separate thread for parallelization
if enable_storage: if enable_storage:
ceph_thread_queue = Queue() ceph_thread_queue = Queue()
ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={}) ceph_stats_thread = Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={})
ceph_stats_thread.start() ceph_stats_thread.start()
# Get node performance statistics # Get node performance statistics
@ -1597,7 +1594,7 @@ def node_keepalive():
# Ensures that, if we lost the lock race and come out of waiting, # Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread. # we won't try to trigger our own fence thread.
if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) != 'dead': if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) != 'dead':
fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={}) fence_thread = Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={})
fence_thread.start() fence_thread.start()
# Write the updated data after we start the fence thread # Write the updated data after we start the fence thread
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })

View File

@ -22,13 +22,13 @@
import gevent.pywsgi import gevent.pywsgi
import flask import flask
import threading
import sys import sys
import time import time
import psycopg2 import psycopg2
from threading import Thread
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
# The metadata server requires client libraries
import daemon_lib.vm as pvc_vm import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network import daemon_lib.network as pvc_network
@ -105,7 +105,7 @@ class MetadataAPIInstance(object):
def start(self): def start(self):
# Launch Metadata API # Launch Metadata API
self.logger.out('Starting Metadata API at 169.254.169.254:80', state='i') self.logger.out('Starting Metadata API at 169.254.169.254:80', state='i')
self.thread = threading.Thread(target=self.launch_wsgi) self.thread = Thread(target=self.launch_wsgi)
self.thread.start() self.thread.start()
self.logger.out('Successfully started Metadata API thread', state='o') self.logger.out('Successfully started Metadata API thread', state='o')

View File

@ -20,13 +20,9 @@
# #
############################################################################### ###############################################################################
import os
import sys
import psutil
import socket
import time import time
import libvirt
import threading from threading import Thread
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
@ -119,13 +115,13 @@ class NodeInstance(object):
if self.config['enable_networking']: if self.config['enable_networking']:
if self.router_state == 'takeover': if self.router_state == 'takeover':
self.logger.out('Setting node {} to primary state'.format(self.name), state='i') self.logger.out('Setting node {} to primary state'.format(self.name), state='i')
transition_thread = threading.Thread(target=self.become_primary, args=(), kwargs={}) transition_thread = Thread(target=self.become_primary, args=(), kwargs={})
transition_thread.start() transition_thread.start()
if self.router_state == 'relinquish': if self.router_state == 'relinquish':
# Skip becoming secondary unless already running # Skip becoming secondary unless already running
if self.daemon_state == 'run' or self.daemon_state == 'shutdown': if self.daemon_state == 'run' or self.daemon_state == 'shutdown':
self.logger.out('Setting node {} to secondary state'.format(self.name), state='i') self.logger.out('Setting node {} to secondary state'.format(self.name), state='i')
transition_thread = threading.Thread(target=self.become_secondary, args=(), kwargs={}) transition_thread = Thread(target=self.become_secondary, args=(), kwargs={})
transition_thread.start() transition_thread.start()
else: else:
# We did nothing, so just become secondary state # We did nothing, so just become secondary state
@ -157,11 +153,11 @@ class NodeInstance(object):
# Do flushing in a thread so it doesn't block the migrates out # Do flushing in a thread so it doesn't block the migrates out
if self.domain_state == 'flush': if self.domain_state == 'flush':
self.flush_thread = threading.Thread(target=self.flush, args=(), kwargs={}) self.flush_thread = Thread(target=self.flush, args=(), kwargs={})
self.flush_thread.start() self.flush_thread.start()
# Do unflushing in a thread so it doesn't block the migrates in # Do unflushing in a thread so it doesn't block the migrates in
if self.domain_state == 'unflush': if self.domain_state == 'unflush':
self.flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={}) self.flush_thread = Thread(target=self.unflush, args=(), kwargs={})
self.flush_thread.start() self.flush_thread.start()
@self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) @self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))

View File

@ -21,17 +21,13 @@
############################################################################### ###############################################################################
import os import os
import sys
import uuid import uuid
import time import time
import threading
import libvirt import libvirt
from threading import Thread, Event
from collections import deque from collections import deque
import fcntl
import signal
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
@ -62,12 +58,12 @@ class VMConsoleWatcherInstance(object):
# Thread options # Thread options
self.thread = None self.thread = None
self.thread_stopper = threading.Event() self.thread_stopper = Event()
# Start execution thread # Start execution thread
def start(self): def start(self):
self.thread_stopper.clear() self.thread_stopper.clear()
self.thread = threading.Thread(target=self.run, args=(), kwargs={}) self.thread = Thread(target=self.run, args=(), kwargs={})
self.logger.out('Starting VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid)) self.logger.out('Starting VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
self.thread.start() self.thread.start()

View File

@ -20,16 +20,13 @@
# #
############################################################################### ###############################################################################
import os
import sys
import uuid import uuid
import socket
import time import time
import threading
import libvirt import libvirt
import kazoo.client
import json import json
from threading import Thread
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common import pvcnoded.common as common
@ -138,7 +135,7 @@ class VMInstance(object):
# Perform a management command # Perform a management command
self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i') self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i')
state_thread = threading.Thread(target=self.manage_vm_state, args=(), kwargs={}) state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
state_thread.start() state_thread.start()
# Get data functions # Get data functions

View File

@ -21,8 +21,8 @@
############################################################################### ###############################################################################
import os import os
import sys
import time import time
from textwrap import dedent from textwrap import dedent
import pvcnoded.log as log import pvcnoded.log as log

View File

@ -21,18 +21,18 @@
############################################################################### ###############################################################################
import subprocess import subprocess
import threading
import signal import signal
import os
import time import time
import shlex
from threading import Thread
from shlex import split as shlex_split
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
class OSDaemon(object): class OSDaemon(object):
def __init__(self, command_string, environment, logfile): def __init__(self, command_string, environment, logfile):
command = shlex.split(command_string) command = shlex_split(command_string)
# Set stdout to be a logfile if set # Set stdout to be a logfile if set
if logfile: if logfile:
stdout = open(logfile, 'a') stdout = open(logfile, 'a')
@ -63,7 +63,7 @@ def run_os_daemon(command_string, environment=None, logfile=None):
# Run a oneshot command, optionally without blocking # Run a oneshot command, optionally without blocking
def run_os_command(command_string, background=False, environment=None, timeout=None): def run_os_command(command_string, background=False, environment=None, timeout=None):
command = shlex.split(command_string) command = shlex_split(command_string)
if background: if background:
def runcmd(): def runcmd():
try: try:
@ -76,7 +76,7 @@ def run_os_command(command_string, background=False, environment=None, timeout=N
) )
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
pass pass
thread = threading.Thread(target=runcmd, args=()) thread = Thread(target=runcmd, args=())
thread.start() thread.start()
return 0, None, None return 0, None, None
else: else:

View File

@ -21,7 +21,6 @@
############################################################################### ###############################################################################
import time import time
import threading
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common import pvcnoded.common as common

View File

@ -20,7 +20,6 @@
# #
############################################################################### ###############################################################################
import kazoo.client
import uuid import uuid
# Child list function # Child list function