Implement domain log watching

Implements the ability for a client to watch almost-live domain
console logs from the hypervisors. It does this using a deque-based
"tail -f" mechanism (with a configurable buffer per-VM) that watches
the domain console logfile in the (configurable) directory every
half-second. It then stores the current buffer in Zookeeper when
changed, where a client can then request it, either as a static piece
of text in the `less` pager, or via a similar "tail -f" functionality
implemented using fixed line splitting and comparison to provide a
generally-seamless output.

Enabling this feature requires each guest VM to implement a Libvirt
serial log and write its (text) console to it, for example using the
default logging directory:

```
<serial type='pty'>
    <log file='/var/log/libvirt/vmname.log' append='off'/>
<serial>
```

The append mode can be either on or off; on grows files unbounded,
off causes the log (and hence the PVC log data) to be truncated on
initial VM startup from offline. The administrator must choose how
they best want to handle this until Libvirt implements their own
clog-type logging format.
This commit is contained in:
Joshua Boniface 2019-04-11 19:06:06 -04:00
parent cbad19adfe
commit 31f252ad9b
8 changed files with 289 additions and 36 deletions

View File

@ -571,6 +571,37 @@ def vm_info(domain, long_output):
retcode, retmsg = pvc_vm.get_info(zk_conn, domain, long_output)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc vm log
###############################################################################
@click.command(name='log', short_help='Show console logs of a VM object.')
@click.argument(
'domain'
)
@click.option(
'-l', '--lines', 'lines', default=1000, show_default=True,
help='Display this many log lines from the end of the log buffer.'
)
@click.option(
'-f', '--follow', 'follow', is_flag=True, default=False,
help='Follow the log buffer; output may be delayed by a few seconds relative to the live system. The --lines value defaults to 10 for the initial output.'
)
def vm_log(domain, lines, follow):
"""
Show console logs of virtual machine DOMAIN on its current node in the 'less' pager or continuously. DOMAIN may be a UUID or name. Note that migrating a VM to a different node will cause the log buffer to be overwritten by entries from the new node.
"""
# Open a Zookeeper connection
zk_conn = pvc_common.startZKConnection(zk_host)
if follow:
# Handle the "new" default of the follow
if lines == 1000:
lines = 10
retcode, retmsg = pvc_vm.follow_console_log(zk_conn, domain, lines)
else:
retcode, retmsg = pvc_vm.get_console_log(zk_conn, domain, lines)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc vm list
###############################################################################
@ -1331,6 +1362,7 @@ cli_vm.add_command(vm_move)
cli_vm.add_command(vm_migrate)
cli_vm.add_command(vm_unmigrate)
cli_vm.add_command(vm_info)
cli_vm.add_command(vm_log)
cli_vm.add_command(vm_list)
cli_network.add_command(net_add)

View File

@ -33,6 +33,8 @@ import lxml.objectify
import configparser
import kazoo.client
from collections import deque
import client_lib.ansiprint as ansiprint
import client_lib.zkhandler as zkhandler
import client_lib.common as common
@ -226,6 +228,7 @@ def define_vm(zk_conn, config_data, target_node, selector):
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): '',
'/domains/{}/failedreason'.format(dom_uuid): '',
'/domains/{}/consolelog'.format(dom_uuid): '',
'/domains/{}/xml'.format(dom_uuid): config_data
})
@ -233,7 +236,7 @@ def define_vm(zk_conn, config_data, target_node, selector):
def modify_vm(zk_conn, domain, restart, new_vm_config):
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
dom_name = getDomainName(zk_conn, domain)
@ -250,7 +253,7 @@ def modify_vm(zk_conn, domain, restart, new_vm_config):
def dump_vm(zk_conn, domain):
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
# Gram the domain XML and dump it to stdout
@ -262,7 +265,7 @@ def dump_vm(zk_conn, domain):
def undefine_vm(zk_conn, domain):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -300,7 +303,7 @@ def undefine_vm(zk_conn, domain):
def start_vm(zk_conn, domain):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -313,7 +316,7 @@ def start_vm(zk_conn, domain):
def restart_vm(zk_conn, domain):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -332,7 +335,7 @@ def restart_vm(zk_conn, domain):
def shutdown_vm(zk_conn, domain):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
# Get state and verify we're OK to proceed
@ -350,7 +353,7 @@ def shutdown_vm(zk_conn, domain):
def stop_vm(zk_conn, domain):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -366,7 +369,7 @@ def stop_vm(zk_conn, domain):
def move_vm(zk_conn, domain, target_node, selector):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -402,7 +405,7 @@ def move_vm(zk_conn, domain, target_node, selector):
def migrate_vm(zk_conn, domain, target_node, selector, force_migrate):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -446,7 +449,7 @@ def migrate_vm(zk_conn, domain, target_node, selector, force_migrate):
def unmigrate_vm(zk_conn, domain):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -476,7 +479,7 @@ def unmigrate_vm(zk_conn, domain):
def get_info(zk_conn, domain, long_output):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if dom_uuid == None:
if not dom_uuid:
common.stopZKConnection(zk_conn)
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
@ -494,6 +497,71 @@ def get_info(zk_conn, domain, long_output):
return True, ''
def get_console_log(zk_conn, domain, lines=1000):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if not dom_uuid:
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
# Get the data from ZK
console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid))
# Shrink the log buffer to length lines
shrunk_log = console_log.split('\n')[-lines:]
loglines = '\n'.join(shrunk_log)
# Show it in the pager (less)
try:
pager = subprocess.Popen(['less', '-R'], stdin=subprocess.PIPE)
pager.communicate(input=loglines.encode('utf8'))
except FileNotFoundError:
return False, 'ERROR: The "less" pager is required to view console logs.'
return True, ''
def follow_console_log(zk_conn, domain, lines=10):
# Validate and obtain alternate passed value
dom_uuid = getDomainUUID(zk_conn, domain)
if not dom_uuid:
return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain)
# Get the initial data from ZK
console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid))
# Shrink the log buffer to length lines
shrunk_log = console_log.split('\n')[-lines:]
loglines = '\n'.join(shrunk_log)
# Print the initial data and begin following
print(loglines, end='')
while True:
# Grab the next line set
new_console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid))
# Split the new and old log strings into constitutent lines
old_console_loglines = console_log.split('\n')
new_console_loglines = new_console_log.split('\n')
# Set the console log to the new log value for the next iteration
console_log = new_console_log
# Remove the lines from the old log until we hit the first line of the new log; this
# ensures that the old log is a string that we can remove from the new log entirely
for index, line in enumerate(old_console_loglines, start=0):
if line == new_console_loglines[0]:
del old_console_loglines[0:index]
break
# Rejoin the log lines into strings
old_console_log = '\n'.join(old_console_loglines)
new_console_log = '\n'.join(new_console_loglines)
# Remove the old lines from the new log
diff_console_log = new_console_log.replace(old_console_log, "")
# If there's a difference, print it out
if diff_console_log != "":
print(diff_console_log, end='')
# Wait a second
time.sleep(1)
return True, ''
def get_list(zk_conn, node, state, limit, raw):
if node != None:
# Verify node is valid

View File

@ -45,7 +45,7 @@ def deletekey(zk_conn, key, recursive=True):
# Data read function
def readdata(zk_conn, key):
data_raw = zk_conn.get(key)
data = data_raw[0].decode('ascii')
data = data_raw[0].decode('utf8')
meta = data_raw[1]
return data
@ -61,7 +61,7 @@ def writedata(zk_conn, kv):
# Check if this key already exists or not
if not zk_conn.exists(key):
# We're creating a new key
zk_transaction.create(key, str(data).encode('ascii'))
zk_transaction.create(key, str(data).encode('utf8'))
else:
# We're updating a key with version validation
orig_data = zk_conn.get(key)
@ -71,7 +71,7 @@ def writedata(zk_conn, kv):
new_version = version + 1
# Update the data
zk_transaction.set_data(key, str(data).encode('ascii'))
zk_transaction.set_data(key, str(data).encode('utf8'))
# Set up the check
try:

View File

@ -111,12 +111,16 @@ pvc:
dynamic_directory: "/run/pvc"
# log_directory: Logging directory
log_directory: "/var/log/pvc"
# console_log_directory: Libvirt console logging directory
console_log_directory: "/var/log/libvirt"
# logging: PVC logging configuration
logging:
# file_logging: Enable or disable logging to files under log_directory
file_logging: True
# stdout_logging: Enable or disable logging to stdout (i.e. journald)
stdout_logging: True
# console_log_lines: Number of console log lines to store in Zookeeper per VM
console_log_lines: 1000
# networking: PVC networking configuration
# OPTIONAL if enable_networking: False
networking:

View File

@ -141,8 +141,10 @@ def readConfig(pvcd_config_file, myhostname):
'enable_storage': o_config['pvc']['functions']['enable_storage'],
'dynamic_directory': o_config['pvc']['system']['configuration']['directories']['dynamic_directory'],
'log_directory': o_config['pvc']['system']['configuration']['directories']['log_directory'],
'console_log_directory': o_config['pvc']['system']['configuration']['directories']['console_log_directory'],
'file_logging': o_config['pvc']['system']['configuration']['logging']['file_logging'],
'stdout_logging': o_config['pvc']['system']['configuration']['logging']['stdout_logging'],
'console_log_lines': o_config['pvc']['system']['configuration']['logging']['console_log_lines'],
'keepalive_interval': o_config['pvc']['system']['fencing']['intervals']['keepalive_interval'],
'fence_intervals': o_config['pvc']['system']['fencing']['intervals']['fence_intervals'],
'suicide_intervals': o_config['pvc']['system']['fencing']['intervals']['suicide_intervals'],
@ -457,12 +459,28 @@ zk_conn.add_listener(zk_listener)
# Cleanup function
def cleanup():
global zk_conn, update_timer
logger.out('Terminating pvcd and cleaning up', state='s')
global zk_conn, update_timer, d_domains
# Stop keepalive thread
try:
stopKeepaliveTimer()
except NameError:
pass
except AttributeError:
pass
logger.out('Terminating pvcd and cleaning up', state='s')
# Stop console logging on all VMs
logger.out('Stopping domain console watchers', state='s')
for domain in d_domain:
if d_domain[domain].getnode() == myhostname:
try:
d_domain[domain].console_log_instance.stop()
except NameError as e:
pass
except AttributeError as e:
pass
# Force into secondary network state if needed
if zkhandler.readdata(zk_conn, '/nodes/{}/routerstate'.format(myhostname)) == 'primary':
@ -471,12 +489,7 @@ def cleanup():
'/nodes/{}/routerstate'.format(myhostname): 'secondary',
'/primary_node': 'none'
})
else:
is_primary = False
# Wait for things to flush
if is_primary:
logger.out('Waiting for primary migration', state='s')
logger.out('Waiting 3 seconds for primary migration', state='s')
time.sleep(3)
# Set stop state in Zookeeper
@ -493,14 +506,11 @@ def cleanup():
pass
logger.out('Terminated pvc daemon', state='s')
# Handle exit gracefully
atexit.register(cleanup)
sys.exit(0)
# Termination function
def term(signum='', frame=''):
# Exit
sys.exit(0)
cleanup()
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)

View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
# DomainConsoleWatcherInstance.py - Class implementing a console log watcher for PVC domains
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import uuid
import time
import threading
import libvirt
from collections import deque
import fcntl
import signal
import pvcd.log as log
import pvcd.zkhandler as zkhandler
class DomainConsoleWatcherInstance(object):
# Initialization function
def __init__(self, domuuid, domname, zk_conn, config, logger, this_node):
self.domuuid = domuuid
self.domname = domname
self.zk_conn = zk_conn
self.config = config
self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname)
self.console_log_lines = config['console_log_lines']
self.logger = logger
self.this_node = this_node
# Try to append (create) the logfile and set its permissions
open(self.logfile, 'a').close()
os.chmod(self.logfile, 0o600)
self.logdeque = deque(open(self.logfile), self.console_log_lines)
self.stamp = None
self.cached_stamp = None
# Set up the deque with the current contents of the log
self.last_loglines = None
self.loglines = None
# Thread options
self.thread = None
self.thread_stopper = threading.Event()
# Start execution thread
def start(self):
self.thread_stopper.clear()
self.thread = threading.Thread(target=self.run, args=(), kwargs={})
self.logger.out('Starting VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
self.thread.start()
# Stop execution thread
def stop(self):
self.logger.out('Stopping VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
self.thread_stopper.set()
# Do one final flush
self.update()
# Main entrypoint
def run(self):
# Main loop
while not self.thread_stopper.is_set():
self.update()
time.sleep(0.5)
def update(self):
self.stamp = os.stat(self.logfile).st_mtime
if self.stamp != self.cached_stamp:
self.cached_stamp = self.stamp
self.fetch_lines()
# Update Zookeeper with the new loglines if they changed
if self.loglines != self.last_loglines:
zkhandler.writedata(self.zk_conn, { '/domains/{}/consolelog'.format(self.domuuid): self.loglines })
self.last_loglines = self.loglines
def fetch_lines(self):
self.logdeque = deque(open(self.logfile), self.console_log_lines)
self.loglines = ''.join(self.logdeque)

View File

@ -32,11 +32,14 @@ import kazoo.client
import pvcd.log as log
import pvcd.zkhandler as zkhandler
import pvcd.DomainConsoleWatcherInstance as DomainConsoleWatcherInstance
class DomainInstance(object):
# Initialization function
def __init__(self, domuuid, zk_conn, config, logger, this_node):
# Passed-in variables on creation
self.domuuid = domuuid
self.domname = zkhandler.readdata(zk_conn, '/domains/{}'.format(domuuid))
self.zk_conn = zk_conn
self.config = config
self.logger = logger
@ -52,8 +55,12 @@ class DomainInstance(object):
self.inshutdown = False
self.instop = False
# Libvirt domuuid
self.dom = self.lookupByUUID(self.domuuid)
# Log watcher instance
self.console_log_instance = DomainConsoleWatcherInstance.DomainConsoleWatcherInstance(self.domuuid, self.domname, self.zk_conn, self.config, self.logger, self.this_node)
# Watch for changes to the state field in Zookeeper
@self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
def watch_state(data, stat, event=""):
@ -69,6 +76,7 @@ class DomainInstance(object):
else:
self.manage_vm_state()
# Get data functions
def getstate(self):
return self.state
@ -118,6 +126,9 @@ class DomainInstance(object):
# Start up the VM
def start_vm(self):
# Start the log watcher
self.console_log_instance.start()
self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instart = True
@ -157,6 +168,7 @@ class DomainInstance(object):
self.dom = None
lv_conn.close()
self.instart = False
# Restart the VM
@ -193,6 +205,9 @@ class DomainInstance(object):
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Stop the VM forcibly
def stop_vm(self):
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
@ -210,6 +225,9 @@ class DomainInstance(object):
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Shutdown the VM gracefully
def shutdown_vm(self):
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
@ -238,6 +256,9 @@ class DomainInstance(object):
self.dom = None
self.inshutdown = False
# Stop the log watcher
self.console_log_instance.stop()
def live_migrate_vm(self, dest_node):
try:
dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.node))
@ -245,7 +266,7 @@ class DomainInstance(object):
raise
except:
self.logger.out('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.node), state='e', prefix='Domain {}:'.format(self.domuuid))
return 1
return False
try:
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0)
@ -255,10 +276,10 @@ class DomainInstance(object):
except:
dest_lv_conn.close()
return 1
return False
dest_lv_conn.close()
return 0
return True
# Migrate the VM to a target host
def migrate_vm(self):
@ -268,9 +289,9 @@ class DomainInstance(object):
try:
migrate_ret = self.live_migrate_vm(self.node)
except:
migrate_ret = 0
migrate_ret = True
if migrate_ret != 0:
if not migrate_ret:
self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid))
self.shutdown_vm()
time.sleep(1)
@ -281,8 +302,14 @@ class DomainInstance(object):
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.inmigrate = False
# Stop the log watcher
self.console_log_instance.stop()
# Receive the migration from another host (wait until VM is running)
def receive_migrate(self):
# Start the log watcher
self.console_log_instance.start()
self.inreceive = True
self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid))
while True:
@ -360,10 +387,16 @@ class DomainInstance(object):
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM is already running and should be
if self.state == "start":
# Start the log watcher
self.console_log_instance.start()
# Add domain to running list
self.addDomainToList()
# VM is already running and should be but stuck in migrate state
elif self.state == "migrate":
# Start the log watcher
self.console_log_instance.start()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
# Add domain to running list
self.addDomainToList()
# VM should be restarted
elif self.state == "restart":
@ -377,9 +410,11 @@ class DomainInstance(object):
else:
# VM should be started
if self.state == "start":
# Start the domain
self.start_vm()
# VM should be migrated to this node
elif self.state == "migrate":
# Receive the migration
self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running)
if self.state == "restart":
@ -387,9 +422,13 @@ class DomainInstance(object):
# VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
# VM should be stoped; ensure it's gone from this node's domain_list
elif self.state == "stop":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
else:
# Conditional pass three - Is this VM currently running on this node

View File

@ -37,7 +37,7 @@ def deletekey(zk_conn, key, recursive=True):
# Data read function
def readdata(zk_conn, key):
data_raw = zk_conn.get(key)
data = data_raw[0].decode('ascii')
data = data_raw[0].decode('utf8')
meta = data_raw[1]
return data
@ -55,7 +55,7 @@ def writedata(zk_conn, kv):
# Check if this key already exists or not
if not zk_conn.exists(key):
# We're creating a new key
zk_transaction.create(key, str(data).encode('ascii'))
zk_transaction.create(key, str(data).encode('utf8'))
else:
# We're updating a key with version validation
orig_data = zk_conn.get(key)
@ -65,7 +65,7 @@ def writedata(zk_conn, kv):
new_version = version + 1
# Update the data
zk_transaction.set_data(key, str(data).encode('ascii'))
zk_transaction.set_data(key, str(data).encode('utf8'))
# Set up the check
try: