Implement domain log watching
Implements the ability for a client to watch almost-live domain console logs from the hypervisors. It does this using a deque-based "tail -f" mechanism (with a configurable buffer per-VM) that watches the domain console logfile in the (configurable) directory every half-second. It then stores the current buffer in Zookeeper when changed, where a client can then request it, either as a static piece of text in the `less` pager, or via a similar "tail -f" functionality implemented using fixed line splitting and comparison to provide a generally-seamless output. Enabling this feature requires each guest VM to implement a Libvirt serial log and write its (text) console to it, for example using the default logging directory: ``` <serial type='pty'> <log file='/var/log/libvirt/vmname.log' append='off'/> <serial> ``` The append mode can be either on or off; on grows files unbounded, off causes the log (and hence the PVC log data) to be truncated on initial VM startup from offline. The administrator must choose how they best want to handle this until Libvirt implements their own clog-type logging format.
This commit is contained in:
parent
5ad2dda6d4
commit
b6ecd36588
|
@ -571,6 +571,37 @@ def vm_info(domain, long_output):
|
||||||
retcode, retmsg = pvc_vm.get_info(zk_conn, domain, long_output)
|
retcode, retmsg = pvc_vm.get_info(zk_conn, domain, long_output)
|
||||||
cleanup(retcode, retmsg, zk_conn)
|
cleanup(retcode, retmsg, zk_conn)
|
||||||
|
|
||||||
|
###############################################################################
|
||||||
|
# pvc vm log
|
||||||
|
###############################################################################
|
||||||
|
@click.command(name='log', short_help='Show console logs of a VM object.')
|
||||||
|
@click.argument(
|
||||||
|
'domain'
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'-l', '--lines', 'lines', default=1000, show_default=True,
|
||||||
|
help='Display this many log lines from the end of the log buffer.'
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'-f', '--follow', 'follow', is_flag=True, default=False,
|
||||||
|
help='Follow the log buffer; output may be delayed by a few seconds relative to the live system. The --lines value defaults to 10 for the initial output.'
|
||||||
|
)
|
||||||
|
def vm_log(domain, lines, follow):
|
||||||
|
"""
|
||||||
|
Show console logs of virtual machine DOMAIN on its current node in the 'less' pager or continuously. DOMAIN may be a UUID or name. Note that migrating a VM to a different node will cause the log buffer to be overwritten by entries from the new node.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Open a Zookeeper connection
|
||||||
|
zk_conn = pvc_common.startZKConnection(zk_host)
|
||||||
|
if follow:
|
||||||
|
# Handle the "new" default of the follow
|
||||||
|
if lines == 1000:
|
||||||
|
lines = 10
|
||||||
|
retcode, retmsg = pvc_vm.follow_console_log(zk_conn, domain, lines)
|
||||||
|
else:
|
||||||
|
retcode, retmsg = pvc_vm.get_console_log(zk_conn, domain, lines)
|
||||||
|
cleanup(retcode, retmsg, zk_conn)
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# pvc vm list
|
# pvc vm list
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
@ -1331,6 +1362,7 @@ cli_vm.add_command(vm_move)
|
||||||
cli_vm.add_command(vm_migrate)
|
cli_vm.add_command(vm_migrate)
|
||||||
cli_vm.add_command(vm_unmigrate)
|
cli_vm.add_command(vm_unmigrate)
|
||||||
cli_vm.add_command(vm_info)
|
cli_vm.add_command(vm_info)
|
||||||
|
cli_vm.add_command(vm_log)
|
||||||
cli_vm.add_command(vm_list)
|
cli_vm.add_command(vm_list)
|
||||||
|
|
||||||
cli_network.add_command(net_add)
|
cli_network.add_command(net_add)
|
||||||
|
|
|
@ -33,6 +33,8 @@ import lxml.objectify
|
||||||
import configparser
|
import configparser
|
||||||
import kazoo.client
|
import kazoo.client
|
||||||
|
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
import client_lib.ansiprint as ansiprint
|
import client_lib.ansiprint as ansiprint
|
||||||
import client_lib.zkhandler as zkhandler
|
import client_lib.zkhandler as zkhandler
|
||||||
import client_lib.common as common
|
import client_lib.common as common
|
||||||
|
@ -226,6 +228,7 @@ def define_vm(zk_conn, config_data, target_node, selector):
|
||||||
'/domains/{}/node'.format(dom_uuid): target_node,
|
'/domains/{}/node'.format(dom_uuid): target_node,
|
||||||
'/domains/{}/lastnode'.format(dom_uuid): '',
|
'/domains/{}/lastnode'.format(dom_uuid): '',
|
||||||
'/domains/{}/failedreason'.format(dom_uuid): '',
|
'/domains/{}/failedreason'.format(dom_uuid): '',
|
||||||
|
'/domains/{}/consolelog'.format(dom_uuid): '',
|
||||||
'/domains/{}/xml'.format(dom_uuid): config_data
|
'/domains/{}/xml'.format(dom_uuid): config_data
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -233,7 +236,7 @@ def define_vm(zk_conn, config_data, target_node, selector):
|
||||||
|
|
||||||
def modify_vm(zk_conn, domain, restart, new_vm_config):
|
def modify_vm(zk_conn, domain, restart, new_vm_config):
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
dom_name = getDomainName(zk_conn, domain)
|
dom_name = getDomainName(zk_conn, domain)
|
||||||
|
|
||||||
|
@ -250,7 +253,7 @@ def modify_vm(zk_conn, domain, restart, new_vm_config):
|
||||||
|
|
||||||
def dump_vm(zk_conn, domain):
|
def dump_vm(zk_conn, domain):
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
# Gram the domain XML and dump it to stdout
|
# Gram the domain XML and dump it to stdout
|
||||||
|
@ -262,7 +265,7 @@ def dump_vm(zk_conn, domain):
|
||||||
def undefine_vm(zk_conn, domain):
|
def undefine_vm(zk_conn, domain):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -300,7 +303,7 @@ def undefine_vm(zk_conn, domain):
|
||||||
def start_vm(zk_conn, domain):
|
def start_vm(zk_conn, domain):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -313,7 +316,7 @@ def start_vm(zk_conn, domain):
|
||||||
def restart_vm(zk_conn, domain):
|
def restart_vm(zk_conn, domain):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -332,7 +335,7 @@ def restart_vm(zk_conn, domain):
|
||||||
def shutdown_vm(zk_conn, domain):
|
def shutdown_vm(zk_conn, domain):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
# Get state and verify we're OK to proceed
|
# Get state and verify we're OK to proceed
|
||||||
|
@ -350,7 +353,7 @@ def shutdown_vm(zk_conn, domain):
|
||||||
def stop_vm(zk_conn, domain):
|
def stop_vm(zk_conn, domain):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -366,7 +369,7 @@ def stop_vm(zk_conn, domain):
|
||||||
def move_vm(zk_conn, domain, target_node, selector):
|
def move_vm(zk_conn, domain, target_node, selector):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -402,7 +405,7 @@ def move_vm(zk_conn, domain, target_node, selector):
|
||||||
def migrate_vm(zk_conn, domain, target_node, selector, force_migrate):
|
def migrate_vm(zk_conn, domain, target_node, selector, force_migrate):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -446,7 +449,7 @@ def migrate_vm(zk_conn, domain, target_node, selector, force_migrate):
|
||||||
def unmigrate_vm(zk_conn, domain):
|
def unmigrate_vm(zk_conn, domain):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -476,7 +479,7 @@ def unmigrate_vm(zk_conn, domain):
|
||||||
def get_info(zk_conn, domain, long_output):
|
def get_info(zk_conn, domain, long_output):
|
||||||
# Validate and obtain alternate passed value
|
# Validate and obtain alternate passed value
|
||||||
dom_uuid = getDomainUUID(zk_conn, domain)
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
if dom_uuid == None:
|
if not dom_uuid:
|
||||||
common.stopZKConnection(zk_conn)
|
common.stopZKConnection(zk_conn)
|
||||||
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
@ -494,6 +497,71 @@ def get_info(zk_conn, domain, long_output):
|
||||||
|
|
||||||
return True, ''
|
return True, ''
|
||||||
|
|
||||||
|
def get_console_log(zk_conn, domain, lines=1000):
|
||||||
|
# Validate and obtain alternate passed value
|
||||||
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
|
if not dom_uuid:
|
||||||
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
# Get the data from ZK
|
||||||
|
console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid))
|
||||||
|
|
||||||
|
# Shrink the log buffer to length lines
|
||||||
|
shrunk_log = console_log.split('\n')[-lines:]
|
||||||
|
loglines = '\n'.join(shrunk_log)
|
||||||
|
|
||||||
|
# Show it in the pager (less)
|
||||||
|
try:
|
||||||
|
pager = subprocess.Popen(['less', '-R'], stdin=subprocess.PIPE)
|
||||||
|
pager.communicate(input=loglines.encode('utf8'))
|
||||||
|
except FileNotFoundError:
|
||||||
|
return False, 'ERROR: The "less" pager is required to view console logs.'
|
||||||
|
|
||||||
|
return True, ''
|
||||||
|
|
||||||
|
def follow_console_log(zk_conn, domain, lines=10):
|
||||||
|
# Validate and obtain alternate passed value
|
||||||
|
dom_uuid = getDomainUUID(zk_conn, domain)
|
||||||
|
if not dom_uuid:
|
||||||
|
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
|
||||||
|
|
||||||
|
# Get the initial data from ZK
|
||||||
|
console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid))
|
||||||
|
|
||||||
|
# Shrink the log buffer to length lines
|
||||||
|
shrunk_log = console_log.split('\n')[-lines:]
|
||||||
|
loglines = '\n'.join(shrunk_log)
|
||||||
|
|
||||||
|
# Print the initial data and begin following
|
||||||
|
print(loglines, end='')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Grab the next line set
|
||||||
|
new_console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid))
|
||||||
|
# Split the new and old log strings into constitutent lines
|
||||||
|
old_console_loglines = console_log.split('\n')
|
||||||
|
new_console_loglines = new_console_log.split('\n')
|
||||||
|
# Set the console log to the new log value for the next iteration
|
||||||
|
console_log = new_console_log
|
||||||
|
# Remove the lines from the old log until we hit the first line of the new log; this
|
||||||
|
# ensures that the old log is a string that we can remove from the new log entirely
|
||||||
|
for index, line in enumerate(old_console_loglines, start=0):
|
||||||
|
if line == new_console_loglines[0]:
|
||||||
|
del old_console_loglines[0:index]
|
||||||
|
break
|
||||||
|
# Rejoin the log lines into strings
|
||||||
|
old_console_log = '\n'.join(old_console_loglines)
|
||||||
|
new_console_log = '\n'.join(new_console_loglines)
|
||||||
|
# Remove the old lines from the new log
|
||||||
|
diff_console_log = new_console_log.replace(old_console_log, "")
|
||||||
|
# If there's a difference, print it out
|
||||||
|
if diff_console_log != "":
|
||||||
|
print(diff_console_log, end='')
|
||||||
|
# Wait a second
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
return True, ''
|
||||||
|
|
||||||
def get_list(zk_conn, node, state, limit, raw):
|
def get_list(zk_conn, node, state, limit, raw):
|
||||||
if node != None:
|
if node != None:
|
||||||
# Verify node is valid
|
# Verify node is valid
|
||||||
|
|
|
@ -45,7 +45,7 @@ def deletekey(zk_conn, key, recursive=True):
|
||||||
# Data read function
|
# Data read function
|
||||||
def readdata(zk_conn, key):
|
def readdata(zk_conn, key):
|
||||||
data_raw = zk_conn.get(key)
|
data_raw = zk_conn.get(key)
|
||||||
data = data_raw[0].decode('ascii')
|
data = data_raw[0].decode('utf8')
|
||||||
meta = data_raw[1]
|
meta = data_raw[1]
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ def writedata(zk_conn, kv):
|
||||||
# Check if this key already exists or not
|
# Check if this key already exists or not
|
||||||
if not zk_conn.exists(key):
|
if not zk_conn.exists(key):
|
||||||
# We're creating a new key
|
# We're creating a new key
|
||||||
zk_transaction.create(key, str(data).encode('ascii'))
|
zk_transaction.create(key, str(data).encode('utf8'))
|
||||||
else:
|
else:
|
||||||
# We're updating a key with version validation
|
# We're updating a key with version validation
|
||||||
orig_data = zk_conn.get(key)
|
orig_data = zk_conn.get(key)
|
||||||
|
@ -71,7 +71,7 @@ def writedata(zk_conn, kv):
|
||||||
new_version = version + 1
|
new_version = version + 1
|
||||||
|
|
||||||
# Update the data
|
# Update the data
|
||||||
zk_transaction.set_data(key, str(data).encode('ascii'))
|
zk_transaction.set_data(key, str(data).encode('utf8'))
|
||||||
|
|
||||||
# Set up the check
|
# Set up the check
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -111,12 +111,16 @@ pvc:
|
||||||
dynamic_directory: "/run/pvc"
|
dynamic_directory: "/run/pvc"
|
||||||
# log_directory: Logging directory
|
# log_directory: Logging directory
|
||||||
log_directory: "/var/log/pvc"
|
log_directory: "/var/log/pvc"
|
||||||
|
# console_log_directory: Libvirt console logging directory
|
||||||
|
console_log_directory: "/var/log/libvirt"
|
||||||
# logging: PVC logging configuration
|
# logging: PVC logging configuration
|
||||||
logging:
|
logging:
|
||||||
# file_logging: Enable or disable logging to files under log_directory
|
# file_logging: Enable or disable logging to files under log_directory
|
||||||
file_logging: True
|
file_logging: True
|
||||||
# stdout_logging: Enable or disable logging to stdout (i.e. journald)
|
# stdout_logging: Enable or disable logging to stdout (i.e. journald)
|
||||||
stdout_logging: True
|
stdout_logging: True
|
||||||
|
# console_log_lines: Number of console log lines to store in Zookeeper per VM
|
||||||
|
console_log_lines: 1000
|
||||||
# networking: PVC networking configuration
|
# networking: PVC networking configuration
|
||||||
# OPTIONAL if enable_networking: False
|
# OPTIONAL if enable_networking: False
|
||||||
networking:
|
networking:
|
||||||
|
|
|
@ -141,8 +141,10 @@ def readConfig(pvcd_config_file, myhostname):
|
||||||
'enable_storage': o_config['pvc']['functions']['enable_storage'],
|
'enable_storage': o_config['pvc']['functions']['enable_storage'],
|
||||||
'dynamic_directory': o_config['pvc']['system']['configuration']['directories']['dynamic_directory'],
|
'dynamic_directory': o_config['pvc']['system']['configuration']['directories']['dynamic_directory'],
|
||||||
'log_directory': o_config['pvc']['system']['configuration']['directories']['log_directory'],
|
'log_directory': o_config['pvc']['system']['configuration']['directories']['log_directory'],
|
||||||
|
'console_log_directory': o_config['pvc']['system']['configuration']['directories']['console_log_directory'],
|
||||||
'file_logging': o_config['pvc']['system']['configuration']['logging']['file_logging'],
|
'file_logging': o_config['pvc']['system']['configuration']['logging']['file_logging'],
|
||||||
'stdout_logging': o_config['pvc']['system']['configuration']['logging']['stdout_logging'],
|
'stdout_logging': o_config['pvc']['system']['configuration']['logging']['stdout_logging'],
|
||||||
|
'console_log_lines': o_config['pvc']['system']['configuration']['logging']['console_log_lines'],
|
||||||
'keepalive_interval': o_config['pvc']['system']['fencing']['intervals']['keepalive_interval'],
|
'keepalive_interval': o_config['pvc']['system']['fencing']['intervals']['keepalive_interval'],
|
||||||
'fence_intervals': o_config['pvc']['system']['fencing']['intervals']['fence_intervals'],
|
'fence_intervals': o_config['pvc']['system']['fencing']['intervals']['fence_intervals'],
|
||||||
'suicide_intervals': o_config['pvc']['system']['fencing']['intervals']['suicide_intervals'],
|
'suicide_intervals': o_config['pvc']['system']['fencing']['intervals']['suicide_intervals'],
|
||||||
|
@ -457,12 +459,28 @@ zk_conn.add_listener(zk_listener)
|
||||||
|
|
||||||
# Cleanup function
|
# Cleanup function
|
||||||
def cleanup():
|
def cleanup():
|
||||||
global zk_conn, update_timer
|
logger.out('Terminating pvcd and cleaning up', state='s')
|
||||||
|
|
||||||
|
global zk_conn, update_timer, d_domains
|
||||||
|
|
||||||
# Stop keepalive thread
|
# Stop keepalive thread
|
||||||
|
try:
|
||||||
stopKeepaliveTimer()
|
stopKeepaliveTimer()
|
||||||
|
except NameError:
|
||||||
|
pass
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.out('Terminating pvcd and cleaning up', state='s')
|
# Stop console logging on all VMs
|
||||||
|
logger.out('Stopping domain console watchers', state='s')
|
||||||
|
for domain in d_domain:
|
||||||
|
if d_domain[domain].getnode() == myhostname:
|
||||||
|
try:
|
||||||
|
d_domain[domain].console_log_instance.stop()
|
||||||
|
except NameError as e:
|
||||||
|
pass
|
||||||
|
except AttributeError as e:
|
||||||
|
pass
|
||||||
|
|
||||||
# Force into secondary network state if needed
|
# Force into secondary network state if needed
|
||||||
if zkhandler.readdata(zk_conn, '/nodes/{}/routerstate'.format(myhostname)) == 'primary':
|
if zkhandler.readdata(zk_conn, '/nodes/{}/routerstate'.format(myhostname)) == 'primary':
|
||||||
|
@ -471,12 +489,7 @@ def cleanup():
|
||||||
'/nodes/{}/routerstate'.format(myhostname): 'secondary',
|
'/nodes/{}/routerstate'.format(myhostname): 'secondary',
|
||||||
'/primary_node': 'none'
|
'/primary_node': 'none'
|
||||||
})
|
})
|
||||||
else:
|
logger.out('Waiting 3 seconds for primary migration', state='s')
|
||||||
is_primary = False
|
|
||||||
|
|
||||||
# Wait for things to flush
|
|
||||||
if is_primary:
|
|
||||||
logger.out('Waiting for primary migration', state='s')
|
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
|
||||||
# Set stop state in Zookeeper
|
# Set stop state in Zookeeper
|
||||||
|
@ -493,14 +506,11 @@ def cleanup():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
logger.out('Terminated pvc daemon', state='s')
|
logger.out('Terminated pvc daemon', state='s')
|
||||||
|
sys.exit(0)
|
||||||
# Handle exit gracefully
|
|
||||||
atexit.register(cleanup)
|
|
||||||
|
|
||||||
# Termination function
|
# Termination function
|
||||||
def term(signum='', frame=''):
|
def term(signum='', frame=''):
|
||||||
# Exit
|
cleanup()
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
# Handle signals gracefully
|
# Handle signals gracefully
|
||||||
signal.signal(signal.SIGTERM, term)
|
signal.signal(signal.SIGTERM, term)
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# DomainConsoleWatcherInstance.py - Class implementing a console log watcher for PVC domains
|
||||||
|
# Part of the Parallel Virtual Cluster (PVC) system
|
||||||
|
#
|
||||||
|
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
###############################################################################
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import uuid
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import libvirt
|
||||||
|
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
import fcntl
|
||||||
|
import signal
|
||||||
|
|
||||||
|
import pvcd.log as log
|
||||||
|
import pvcd.zkhandler as zkhandler
|
||||||
|
|
||||||
|
class DomainConsoleWatcherInstance(object):
|
||||||
|
# Initialization function
|
||||||
|
def __init__(self, domuuid, domname, zk_conn, config, logger, this_node):
|
||||||
|
self.domuuid = domuuid
|
||||||
|
self.domname = domname
|
||||||
|
self.zk_conn = zk_conn
|
||||||
|
self.config = config
|
||||||
|
self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname)
|
||||||
|
self.console_log_lines = config['console_log_lines']
|
||||||
|
self.logger = logger
|
||||||
|
self.this_node = this_node
|
||||||
|
|
||||||
|
# Try to append (create) the logfile and set its permissions
|
||||||
|
open(self.logfile, 'a').close()
|
||||||
|
os.chmod(self.logfile, 0o600)
|
||||||
|
|
||||||
|
self.logdeque = deque(open(self.logfile), self.console_log_lines)
|
||||||
|
|
||||||
|
self.stamp = None
|
||||||
|
self.cached_stamp = None
|
||||||
|
|
||||||
|
# Set up the deque with the current contents of the log
|
||||||
|
self.last_loglines = None
|
||||||
|
self.loglines = None
|
||||||
|
|
||||||
|
# Thread options
|
||||||
|
self.thread = None
|
||||||
|
self.thread_stopper = threading.Event()
|
||||||
|
|
||||||
|
# Start execution thread
|
||||||
|
def start(self):
|
||||||
|
self.thread_stopper.clear()
|
||||||
|
self.thread = threading.Thread(target=self.run, args=(), kwargs={})
|
||||||
|
self.logger.out('Starting VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
# Stop execution thread
|
||||||
|
def stop(self):
|
||||||
|
self.logger.out('Stopping VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
|
||||||
|
self.thread_stopper.set()
|
||||||
|
# Do one final flush
|
||||||
|
self.update()
|
||||||
|
|
||||||
|
# Main entrypoint
|
||||||
|
def run(self):
|
||||||
|
# Main loop
|
||||||
|
while not self.thread_stopper.is_set():
|
||||||
|
self.update()
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
self.stamp = os.stat(self.logfile).st_mtime
|
||||||
|
if self.stamp != self.cached_stamp:
|
||||||
|
self.cached_stamp = self.stamp
|
||||||
|
self.fetch_lines()
|
||||||
|
# Update Zookeeper with the new loglines if they changed
|
||||||
|
if self.loglines != self.last_loglines:
|
||||||
|
zkhandler.writedata(self.zk_conn, { '/domains/{}/consolelog'.format(self.domuuid): self.loglines })
|
||||||
|
self.last_loglines = self.loglines
|
||||||
|
|
||||||
|
def fetch_lines(self):
|
||||||
|
self.logdeque = deque(open(self.logfile), self.console_log_lines)
|
||||||
|
self.loglines = ''.join(self.logdeque)
|
|
@ -32,11 +32,14 @@ import kazoo.client
|
||||||
import pvcd.log as log
|
import pvcd.log as log
|
||||||
import pvcd.zkhandler as zkhandler
|
import pvcd.zkhandler as zkhandler
|
||||||
|
|
||||||
|
import pvcd.DomainConsoleWatcherInstance as DomainConsoleWatcherInstance
|
||||||
|
|
||||||
class DomainInstance(object):
|
class DomainInstance(object):
|
||||||
# Initialization function
|
# Initialization function
|
||||||
def __init__(self, domuuid, zk_conn, config, logger, this_node):
|
def __init__(self, domuuid, zk_conn, config, logger, this_node):
|
||||||
# Passed-in variables on creation
|
# Passed-in variables on creation
|
||||||
self.domuuid = domuuid
|
self.domuuid = domuuid
|
||||||
|
self.domname = zkhandler.readdata(zk_conn, '/domains/{}'.format(domuuid))
|
||||||
self.zk_conn = zk_conn
|
self.zk_conn = zk_conn
|
||||||
self.config = config
|
self.config = config
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
@ -52,8 +55,12 @@ class DomainInstance(object):
|
||||||
self.inshutdown = False
|
self.inshutdown = False
|
||||||
self.instop = False
|
self.instop = False
|
||||||
|
|
||||||
|
# Libvirt domuuid
|
||||||
self.dom = self.lookupByUUID(self.domuuid)
|
self.dom = self.lookupByUUID(self.domuuid)
|
||||||
|
|
||||||
|
# Log watcher instance
|
||||||
|
self.console_log_instance = DomainConsoleWatcherInstance.DomainConsoleWatcherInstance(self.domuuid, self.domname, self.zk_conn, self.config, self.logger, self.this_node)
|
||||||
|
|
||||||
# Watch for changes to the state field in Zookeeper
|
# Watch for changes to the state field in Zookeeper
|
||||||
@self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
|
@self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
|
||||||
def watch_state(data, stat, event=""):
|
def watch_state(data, stat, event=""):
|
||||||
|
@ -69,6 +76,7 @@ class DomainInstance(object):
|
||||||
else:
|
else:
|
||||||
self.manage_vm_state()
|
self.manage_vm_state()
|
||||||
|
|
||||||
|
|
||||||
# Get data functions
|
# Get data functions
|
||||||
def getstate(self):
|
def getstate(self):
|
||||||
return self.state
|
return self.state
|
||||||
|
@ -118,6 +126,9 @@ class DomainInstance(object):
|
||||||
|
|
||||||
# Start up the VM
|
# Start up the VM
|
||||||
def start_vm(self):
|
def start_vm(self):
|
||||||
|
# Start the log watcher
|
||||||
|
self.console_log_instance.start()
|
||||||
|
|
||||||
self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
|
self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
|
||||||
self.instart = True
|
self.instart = True
|
||||||
|
|
||||||
|
@ -157,6 +168,7 @@ class DomainInstance(object):
|
||||||
self.dom = None
|
self.dom = None
|
||||||
|
|
||||||
lv_conn.close()
|
lv_conn.close()
|
||||||
|
|
||||||
self.instart = False
|
self.instart = False
|
||||||
|
|
||||||
# Restart the VM
|
# Restart the VM
|
||||||
|
@ -193,6 +205,9 @@ class DomainInstance(object):
|
||||||
self.dom = None
|
self.dom = None
|
||||||
self.instop = False
|
self.instop = False
|
||||||
|
|
||||||
|
# Stop the log watcher
|
||||||
|
self.console_log_instance.stop()
|
||||||
|
|
||||||
# Stop the VM forcibly
|
# Stop the VM forcibly
|
||||||
def stop_vm(self):
|
def stop_vm(self):
|
||||||
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
|
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
|
||||||
|
@ -210,6 +225,9 @@ class DomainInstance(object):
|
||||||
self.dom = None
|
self.dom = None
|
||||||
self.instop = False
|
self.instop = False
|
||||||
|
|
||||||
|
# Stop the log watcher
|
||||||
|
self.console_log_instance.stop()
|
||||||
|
|
||||||
# Shutdown the VM gracefully
|
# Shutdown the VM gracefully
|
||||||
def shutdown_vm(self):
|
def shutdown_vm(self):
|
||||||
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
|
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
|
||||||
|
@ -238,6 +256,9 @@ class DomainInstance(object):
|
||||||
self.dom = None
|
self.dom = None
|
||||||
self.inshutdown = False
|
self.inshutdown = False
|
||||||
|
|
||||||
|
# Stop the log watcher
|
||||||
|
self.console_log_instance.stop()
|
||||||
|
|
||||||
def live_migrate_vm(self, dest_node):
|
def live_migrate_vm(self, dest_node):
|
||||||
try:
|
try:
|
||||||
dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.node))
|
dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.node))
|
||||||
|
@ -245,7 +266,7 @@ class DomainInstance(object):
|
||||||
raise
|
raise
|
||||||
except:
|
except:
|
||||||
self.logger.out('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.node), state='e', prefix='Domain {}:'.format(self.domuuid))
|
self.logger.out('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.node), state='e', prefix='Domain {}:'.format(self.domuuid))
|
||||||
return 1
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0)
|
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0)
|
||||||
|
@ -255,10 +276,10 @@ class DomainInstance(object):
|
||||||
|
|
||||||
except:
|
except:
|
||||||
dest_lv_conn.close()
|
dest_lv_conn.close()
|
||||||
return 1
|
return False
|
||||||
|
|
||||||
dest_lv_conn.close()
|
dest_lv_conn.close()
|
||||||
return 0
|
return True
|
||||||
|
|
||||||
# Migrate the VM to a target host
|
# Migrate the VM to a target host
|
||||||
def migrate_vm(self):
|
def migrate_vm(self):
|
||||||
|
@ -268,9 +289,9 @@ class DomainInstance(object):
|
||||||
try:
|
try:
|
||||||
migrate_ret = self.live_migrate_vm(self.node)
|
migrate_ret = self.live_migrate_vm(self.node)
|
||||||
except:
|
except:
|
||||||
migrate_ret = 0
|
migrate_ret = True
|
||||||
|
|
||||||
if migrate_ret != 0:
|
if not migrate_ret:
|
||||||
self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid))
|
self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid))
|
||||||
self.shutdown_vm()
|
self.shutdown_vm()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
@ -281,8 +302,14 @@ class DomainInstance(object):
|
||||||
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
|
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
|
||||||
self.inmigrate = False
|
self.inmigrate = False
|
||||||
|
|
||||||
|
# Stop the log watcher
|
||||||
|
self.console_log_instance.stop()
|
||||||
|
|
||||||
# Receive the migration from another host (wait until VM is running)
|
# Receive the migration from another host (wait until VM is running)
|
||||||
def receive_migrate(self):
|
def receive_migrate(self):
|
||||||
|
# Start the log watcher
|
||||||
|
self.console_log_instance.start()
|
||||||
|
|
||||||
self.inreceive = True
|
self.inreceive = True
|
||||||
self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid))
|
self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid))
|
||||||
while True:
|
while True:
|
||||||
|
@ -360,10 +387,16 @@ class DomainInstance(object):
|
||||||
if running == libvirt.VIR_DOMAIN_RUNNING:
|
if running == libvirt.VIR_DOMAIN_RUNNING:
|
||||||
# VM is already running and should be
|
# VM is already running and should be
|
||||||
if self.state == "start":
|
if self.state == "start":
|
||||||
|
# Start the log watcher
|
||||||
|
self.console_log_instance.start()
|
||||||
|
# Add domain to running list
|
||||||
self.addDomainToList()
|
self.addDomainToList()
|
||||||
# VM is already running and should be but stuck in migrate state
|
# VM is already running and should be but stuck in migrate state
|
||||||
elif self.state == "migrate":
|
elif self.state == "migrate":
|
||||||
|
# Start the log watcher
|
||||||
|
self.console_log_instance.start()
|
||||||
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
|
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
|
||||||
|
# Add domain to running list
|
||||||
self.addDomainToList()
|
self.addDomainToList()
|
||||||
# VM should be restarted
|
# VM should be restarted
|
||||||
elif self.state == "restart":
|
elif self.state == "restart":
|
||||||
|
@ -377,9 +410,11 @@ class DomainInstance(object):
|
||||||
else:
|
else:
|
||||||
# VM should be started
|
# VM should be started
|
||||||
if self.state == "start":
|
if self.state == "start":
|
||||||
|
# Start the domain
|
||||||
self.start_vm()
|
self.start_vm()
|
||||||
# VM should be migrated to this node
|
# VM should be migrated to this node
|
||||||
elif self.state == "migrate":
|
elif self.state == "migrate":
|
||||||
|
# Receive the migration
|
||||||
self.receive_migrate()
|
self.receive_migrate()
|
||||||
# VM should be restarted (i.e. started since it isn't running)
|
# VM should be restarted (i.e. started since it isn't running)
|
||||||
if self.state == "restart":
|
if self.state == "restart":
|
||||||
|
@ -387,9 +422,13 @@ class DomainInstance(object):
|
||||||
# VM should be shut down; ensure it's gone from this node's domain_list
|
# VM should be shut down; ensure it's gone from this node's domain_list
|
||||||
elif self.state == "shutdown":
|
elif self.state == "shutdown":
|
||||||
self.removeDomainFromList()
|
self.removeDomainFromList()
|
||||||
|
# Stop the log watcher
|
||||||
|
self.console_log_instance.stop()
|
||||||
# VM should be stoped; ensure it's gone from this node's domain_list
|
# VM should be stoped; ensure it's gone from this node's domain_list
|
||||||
elif self.state == "stop":
|
elif self.state == "stop":
|
||||||
self.removeDomainFromList()
|
self.removeDomainFromList()
|
||||||
|
# Stop the log watcher
|
||||||
|
self.console_log_instance.stop()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Conditional pass three - Is this VM currently running on this node
|
# Conditional pass three - Is this VM currently running on this node
|
||||||
|
|
|
@ -37,7 +37,7 @@ def deletekey(zk_conn, key, recursive=True):
|
||||||
# Data read function
|
# Data read function
|
||||||
def readdata(zk_conn, key):
|
def readdata(zk_conn, key):
|
||||||
data_raw = zk_conn.get(key)
|
data_raw = zk_conn.get(key)
|
||||||
data = data_raw[0].decode('ascii')
|
data = data_raw[0].decode('utf8')
|
||||||
meta = data_raw[1]
|
meta = data_raw[1]
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ def writedata(zk_conn, kv):
|
||||||
# Check if this key already exists or not
|
# Check if this key already exists or not
|
||||||
if not zk_conn.exists(key):
|
if not zk_conn.exists(key):
|
||||||
# We're creating a new key
|
# We're creating a new key
|
||||||
zk_transaction.create(key, str(data).encode('ascii'))
|
zk_transaction.create(key, str(data).encode('utf8'))
|
||||||
else:
|
else:
|
||||||
# We're updating a key with version validation
|
# We're updating a key with version validation
|
||||||
orig_data = zk_conn.get(key)
|
orig_data = zk_conn.get(key)
|
||||||
|
@ -65,7 +65,7 @@ def writedata(zk_conn, kv):
|
||||||
new_version = version + 1
|
new_version = version + 1
|
||||||
|
|
||||||
# Update the data
|
# Update the data
|
||||||
zk_transaction.set_data(key, str(data).encode('ascii'))
|
zk_transaction.set_data(key, str(data).encode('utf8'))
|
||||||
|
|
||||||
# Set up the check
|
# Set up the check
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue