diff --git a/client-cli/pvc.py b/client-cli/pvc.py index e0e20774..4dff0075 100755 --- a/client-cli/pvc.py +++ b/client-cli/pvc.py @@ -571,6 +571,37 @@ def vm_info(domain, long_output): retcode, retmsg = pvc_vm.get_info(zk_conn, domain, long_output) cleanup(retcode, retmsg, zk_conn) +############################################################################### +# pvc vm log +############################################################################### +@click.command(name='log', short_help='Show console logs of a VM object.') +@click.argument( + 'domain' +) +@click.option( + '-l', '--lines', 'lines', default=1000, show_default=True, + help='Display this many log lines from the end of the log buffer.' +) +@click.option( + '-f', '--follow', 'follow', is_flag=True, default=False, + help='Follow the log buffer; output may be delayed by a few seconds relative to the live system. The --lines value defaults to 10 for the initial output.' +) +def vm_log(domain, lines, follow): + """ + Show console logs of virtual machine DOMAIN on its current node in the 'less' pager or continuously. DOMAIN may be a UUID or name. Note that migrating a VM to a different node will cause the log buffer to be overwritten by entries from the new node. + """ + + # Open a Zookeeper connection + zk_conn = pvc_common.startZKConnection(zk_host) + if follow: + # Handle the "new" default of the follow + if lines == 1000: + lines = 10 + retcode, retmsg = pvc_vm.follow_console_log(zk_conn, domain, lines) + else: + retcode, retmsg = pvc_vm.get_console_log(zk_conn, domain, lines) + cleanup(retcode, retmsg, zk_conn) + ############################################################################### # pvc vm list ############################################################################### @@ -1331,6 +1362,7 @@ cli_vm.add_command(vm_move) cli_vm.add_command(vm_migrate) cli_vm.add_command(vm_unmigrate) cli_vm.add_command(vm_info) +cli_vm.add_command(vm_log) cli_vm.add_command(vm_list) cli_network.add_command(net_add) diff --git a/client-common/vm.py b/client-common/vm.py index fd2808d2..12931eb8 100644 --- a/client-common/vm.py +++ b/client-common/vm.py @@ -33,6 +33,8 @@ import lxml.objectify import configparser import kazoo.client +from collections import deque + import client_lib.ansiprint as ansiprint import client_lib.zkhandler as zkhandler import client_lib.common as common @@ -226,6 +228,7 @@ def define_vm(zk_conn, config_data, target_node, selector): '/domains/{}/node'.format(dom_uuid): target_node, '/domains/{}/lastnode'.format(dom_uuid): '', '/domains/{}/failedreason'.format(dom_uuid): '', + '/domains/{}/consolelog'.format(dom_uuid): '', '/domains/{}/xml'.format(dom_uuid): config_data }) @@ -233,7 +236,7 @@ def define_vm(zk_conn, config_data, target_node, selector): def modify_vm(zk_conn, domain, restart, new_vm_config): dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) dom_name = getDomainName(zk_conn, domain) @@ -250,7 +253,7 @@ def modify_vm(zk_conn, domain, restart, new_vm_config): def dump_vm(zk_conn, domain): dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Gram the domain XML and dump it to stdout @@ -262,7 +265,7 @@ def dump_vm(zk_conn, domain): def undefine_vm(zk_conn, domain): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -300,7 +303,7 @@ def undefine_vm(zk_conn, domain): def start_vm(zk_conn, domain): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -313,7 +316,7 @@ def start_vm(zk_conn, domain): def restart_vm(zk_conn, domain): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -332,7 +335,7 @@ def restart_vm(zk_conn, domain): def shutdown_vm(zk_conn, domain): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed @@ -350,7 +353,7 @@ def shutdown_vm(zk_conn, domain): def stop_vm(zk_conn, domain): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -366,7 +369,7 @@ def stop_vm(zk_conn, domain): def move_vm(zk_conn, domain, target_node, selector): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -402,7 +405,7 @@ def move_vm(zk_conn, domain, target_node, selector): def migrate_vm(zk_conn, domain, target_node, selector, force_migrate): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -446,7 +449,7 @@ def migrate_vm(zk_conn, domain, target_node, selector, force_migrate): def unmigrate_vm(zk_conn, domain): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -476,7 +479,7 @@ def unmigrate_vm(zk_conn, domain): def get_info(zk_conn, domain, long_output): # Validate and obtain alternate passed value dom_uuid = getDomainUUID(zk_conn, domain) - if dom_uuid == None: + if not dom_uuid: common.stopZKConnection(zk_conn) return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) @@ -494,6 +497,71 @@ def get_info(zk_conn, domain, long_output): return True, '' +def get_console_log(zk_conn, domain, lines=1000): + # Validate and obtain alternate passed value + dom_uuid = getDomainUUID(zk_conn, domain) + if not dom_uuid: + return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) + + # Get the data from ZK + console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid)) + + # Shrink the log buffer to length lines + shrunk_log = console_log.split('\n')[-lines:] + loglines = '\n'.join(shrunk_log) + + # Show it in the pager (less) + try: + pager = subprocess.Popen(['less', '-R'], stdin=subprocess.PIPE) + pager.communicate(input=loglines.encode('utf8')) + except FileNotFoundError: + return False, 'ERROR: The "less" pager is required to view console logs.' + + return True, '' + +def follow_console_log(zk_conn, domain, lines=10): + # Validate and obtain alternate passed value + dom_uuid = getDomainUUID(zk_conn, domain) + if not dom_uuid: + return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) + + # Get the initial data from ZK + console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid)) + + # Shrink the log buffer to length lines + shrunk_log = console_log.split('\n')[-lines:] + loglines = '\n'.join(shrunk_log) + + # Print the initial data and begin following + print(loglines, end='') + + while True: + # Grab the next line set + new_console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid)) + # Split the new and old log strings into constitutent lines + old_console_loglines = console_log.split('\n') + new_console_loglines = new_console_log.split('\n') + # Set the console log to the new log value for the next iteration + console_log = new_console_log + # Remove the lines from the old log until we hit the first line of the new log; this + # ensures that the old log is a string that we can remove from the new log entirely + for index, line in enumerate(old_console_loglines, start=0): + if line == new_console_loglines[0]: + del old_console_loglines[0:index] + break + # Rejoin the log lines into strings + old_console_log = '\n'.join(old_console_loglines) + new_console_log = '\n'.join(new_console_loglines) + # Remove the old lines from the new log + diff_console_log = new_console_log.replace(old_console_log, "") + # If there's a difference, print it out + if diff_console_log != "": + print(diff_console_log, end='') + # Wait a second + time.sleep(1) + + return True, '' + def get_list(zk_conn, node, state, limit, raw): if node != None: # Verify node is valid diff --git a/client-common/zkhandler.py b/client-common/zkhandler.py index 6e7d0b8c..a07e57f4 100644 --- a/client-common/zkhandler.py +++ b/client-common/zkhandler.py @@ -45,7 +45,7 @@ def deletekey(zk_conn, key, recursive=True): # Data read function def readdata(zk_conn, key): data_raw = zk_conn.get(key) - data = data_raw[0].decode('ascii') + data = data_raw[0].decode('utf8') meta = data_raw[1] return data @@ -61,7 +61,7 @@ def writedata(zk_conn, kv): # Check if this key already exists or not if not zk_conn.exists(key): # We're creating a new key - zk_transaction.create(key, str(data).encode('ascii')) + zk_transaction.create(key, str(data).encode('utf8')) else: # We're updating a key with version validation orig_data = zk_conn.get(key) @@ -71,7 +71,7 @@ def writedata(zk_conn, kv): new_version = version + 1 # Update the data - zk_transaction.set_data(key, str(data).encode('ascii')) + zk_transaction.set_data(key, str(data).encode('utf8')) # Set up the check try: diff --git a/node-daemon/pvcd.sample.yaml b/node-daemon/pvcd.sample.yaml index da033858..e85e66d3 100644 --- a/node-daemon/pvcd.sample.yaml +++ b/node-daemon/pvcd.sample.yaml @@ -111,12 +111,16 @@ pvc: dynamic_directory: "/run/pvc" # log_directory: Logging directory log_directory: "/var/log/pvc" + # console_log_directory: Libvirt console logging directory + console_log_directory: "/var/log/libvirt" # logging: PVC logging configuration logging: # file_logging: Enable or disable logging to files under log_directory file_logging: True # stdout_logging: Enable or disable logging to stdout (i.e. journald) stdout_logging: True + # console_log_lines: Number of console log lines to store in Zookeeper per VM + console_log_lines: 1000 # networking: PVC networking configuration # OPTIONAL if enable_networking: False networking: diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index 44be8781..d0201dfc 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -141,8 +141,10 @@ def readConfig(pvcd_config_file, myhostname): 'enable_storage': o_config['pvc']['functions']['enable_storage'], 'dynamic_directory': o_config['pvc']['system']['configuration']['directories']['dynamic_directory'], 'log_directory': o_config['pvc']['system']['configuration']['directories']['log_directory'], + 'console_log_directory': o_config['pvc']['system']['configuration']['directories']['console_log_directory'], 'file_logging': o_config['pvc']['system']['configuration']['logging']['file_logging'], 'stdout_logging': o_config['pvc']['system']['configuration']['logging']['stdout_logging'], + 'console_log_lines': o_config['pvc']['system']['configuration']['logging']['console_log_lines'], 'keepalive_interval': o_config['pvc']['system']['fencing']['intervals']['keepalive_interval'], 'fence_intervals': o_config['pvc']['system']['fencing']['intervals']['fence_intervals'], 'suicide_intervals': o_config['pvc']['system']['fencing']['intervals']['suicide_intervals'], @@ -457,12 +459,28 @@ zk_conn.add_listener(zk_listener) # Cleanup function def cleanup(): - global zk_conn, update_timer + logger.out('Terminating pvcd and cleaning up', state='s') + + global zk_conn, update_timer, d_domains # Stop keepalive thread - stopKeepaliveTimer() + try: + stopKeepaliveTimer() + except NameError: + pass + except AttributeError: + pass - logger.out('Terminating pvcd and cleaning up', state='s') + # Stop console logging on all VMs + logger.out('Stopping domain console watchers', state='s') + for domain in d_domain: + if d_domain[domain].getnode() == myhostname: + try: + d_domain[domain].console_log_instance.stop() + except NameError as e: + pass + except AttributeError as e: + pass # Force into secondary network state if needed if zkhandler.readdata(zk_conn, '/nodes/{}/routerstate'.format(myhostname)) == 'primary': @@ -471,12 +489,7 @@ def cleanup(): '/nodes/{}/routerstate'.format(myhostname): 'secondary', '/primary_node': 'none' }) - else: - is_primary = False - - # Wait for things to flush - if is_primary: - logger.out('Waiting for primary migration', state='s') + logger.out('Waiting 3 seconds for primary migration', state='s') time.sleep(3) # Set stop state in Zookeeper @@ -493,14 +506,11 @@ def cleanup(): pass logger.out('Terminated pvc daemon', state='s') - -# Handle exit gracefully -atexit.register(cleanup) + sys.exit(0) # Termination function def term(signum='', frame=''): - # Exit - sys.exit(0) + cleanup() # Handle signals gracefully signal.signal(signal.SIGTERM, term) diff --git a/node-daemon/pvcd/DomainConsoleWatcherInstance.py b/node-daemon/pvcd/DomainConsoleWatcherInstance.py new file mode 100644 index 00000000..36376f1a --- /dev/null +++ b/node-daemon/pvcd/DomainConsoleWatcherInstance.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 + +# DomainConsoleWatcherInstance.py - Class implementing a console log watcher for PVC domains +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import os +import sys +import uuid +import time +import threading +import libvirt + +from collections import deque + +import fcntl +import signal + +import pvcd.log as log +import pvcd.zkhandler as zkhandler + +class DomainConsoleWatcherInstance(object): + # Initialization function + def __init__(self, domuuid, domname, zk_conn, config, logger, this_node): + self.domuuid = domuuid + self.domname = domname + self.zk_conn = zk_conn + self.config = config + self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname) + self.console_log_lines = config['console_log_lines'] + self.logger = logger + self.this_node = this_node + + # Try to append (create) the logfile and set its permissions + open(self.logfile, 'a').close() + os.chmod(self.logfile, 0o600) + + self.logdeque = deque(open(self.logfile), self.console_log_lines) + + self.stamp = None + self.cached_stamp = None + + # Set up the deque with the current contents of the log + self.last_loglines = None + self.loglines = None + + # Thread options + self.thread = None + self.thread_stopper = threading.Event() + + # Start execution thread + def start(self): + self.thread_stopper.clear() + self.thread = threading.Thread(target=self.run, args=(), kwargs={}) + self.logger.out('Starting VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.thread.start() + + # Stop execution thread + def stop(self): + self.logger.out('Stopping VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.thread_stopper.set() + # Do one final flush + self.update() + + # Main entrypoint + def run(self): + # Main loop + while not self.thread_stopper.is_set(): + self.update() + time.sleep(0.5) + + def update(self): + self.stamp = os.stat(self.logfile).st_mtime + if self.stamp != self.cached_stamp: + self.cached_stamp = self.stamp + self.fetch_lines() + # Update Zookeeper with the new loglines if they changed + if self.loglines != self.last_loglines: + zkhandler.writedata(self.zk_conn, { '/domains/{}/consolelog'.format(self.domuuid): self.loglines }) + self.last_loglines = self.loglines + + def fetch_lines(self): + self.logdeque = deque(open(self.logfile), self.console_log_lines) + self.loglines = ''.join(self.logdeque) diff --git a/node-daemon/pvcd/DomainInstance.py b/node-daemon/pvcd/DomainInstance.py index 910f9a7e..32a1ece3 100644 --- a/node-daemon/pvcd/DomainInstance.py +++ b/node-daemon/pvcd/DomainInstance.py @@ -32,11 +32,14 @@ import kazoo.client import pvcd.log as log import pvcd.zkhandler as zkhandler +import pvcd.DomainConsoleWatcherInstance as DomainConsoleWatcherInstance + class DomainInstance(object): # Initialization function def __init__(self, domuuid, zk_conn, config, logger, this_node): # Passed-in variables on creation self.domuuid = domuuid + self.domname = zkhandler.readdata(zk_conn, '/domains/{}'.format(domuuid)) self.zk_conn = zk_conn self.config = config self.logger = logger @@ -52,8 +55,12 @@ class DomainInstance(object): self.inshutdown = False self.instop = False + # Libvirt domuuid self.dom = self.lookupByUUID(self.domuuid) + # Log watcher instance + self.console_log_instance = DomainConsoleWatcherInstance.DomainConsoleWatcherInstance(self.domuuid, self.domname, self.zk_conn, self.config, self.logger, self.this_node) + # Watch for changes to the state field in Zookeeper @self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid)) def watch_state(data, stat, event=""): @@ -69,6 +76,7 @@ class DomainInstance(object): else: self.manage_vm_state() + # Get data functions def getstate(self): return self.state @@ -118,6 +126,9 @@ class DomainInstance(object): # Start up the VM def start_vm(self): + # Start the log watcher + self.console_log_instance.start() + self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid)) self.instart = True @@ -157,6 +168,7 @@ class DomainInstance(object): self.dom = None lv_conn.close() + self.instart = False # Restart the VM @@ -193,6 +205,9 @@ class DomainInstance(object): self.dom = None self.instop = False + # Stop the log watcher + self.console_log_instance.stop() + # Stop the VM forcibly def stop_vm(self): self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) @@ -210,6 +225,9 @@ class DomainInstance(object): self.dom = None self.instop = False + # Stop the log watcher + self.console_log_instance.stop() + # Shutdown the VM gracefully def shutdown_vm(self): self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) @@ -238,6 +256,9 @@ class DomainInstance(object): self.dom = None self.inshutdown = False + # Stop the log watcher + self.console_log_instance.stop() + def live_migrate_vm(self, dest_node): try: dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.node)) @@ -245,7 +266,7 @@ class DomainInstance(object): raise except: self.logger.out('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.node), state='e', prefix='Domain {}:'.format(self.domuuid)) - return 1 + return False try: target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0) @@ -255,10 +276,10 @@ class DomainInstance(object): except: dest_lv_conn.close() - return 1 + return False dest_lv_conn.close() - return 0 + return True # Migrate the VM to a target host def migrate_vm(self): @@ -268,9 +289,9 @@ class DomainInstance(object): try: migrate_ret = self.live_migrate_vm(self.node) except: - migrate_ret = 0 + migrate_ret = True - if migrate_ret != 0: + if not migrate_ret: self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid)) self.shutdown_vm() time.sleep(1) @@ -281,8 +302,14 @@ class DomainInstance(object): zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) self.inmigrate = False + # Stop the log watcher + self.console_log_instance.stop() + # Receive the migration from another host (wait until VM is running) def receive_migrate(self): + # Start the log watcher + self.console_log_instance.start() + self.inreceive = True self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid)) while True: @@ -360,10 +387,16 @@ class DomainInstance(object): if running == libvirt.VIR_DOMAIN_RUNNING: # VM is already running and should be if self.state == "start": + # Start the log watcher + self.console_log_instance.start() + # Add domain to running list self.addDomainToList() # VM is already running and should be but stuck in migrate state elif self.state == "migrate": + # Start the log watcher + self.console_log_instance.start() zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) + # Add domain to running list self.addDomainToList() # VM should be restarted elif self.state == "restart": @@ -377,9 +410,11 @@ class DomainInstance(object): else: # VM should be started if self.state == "start": + # Start the domain self.start_vm() # VM should be migrated to this node elif self.state == "migrate": + # Receive the migration self.receive_migrate() # VM should be restarted (i.e. started since it isn't running) if self.state == "restart": @@ -387,9 +422,13 @@ class DomainInstance(object): # VM should be shut down; ensure it's gone from this node's domain_list elif self.state == "shutdown": self.removeDomainFromList() + # Stop the log watcher + self.console_log_instance.stop() # VM should be stoped; ensure it's gone from this node's domain_list elif self.state == "stop": self.removeDomainFromList() + # Stop the log watcher + self.console_log_instance.stop() else: # Conditional pass three - Is this VM currently running on this node diff --git a/node-daemon/pvcd/zkhandler.py b/node-daemon/pvcd/zkhandler.py index 279495cb..b1d351ce 100644 --- a/node-daemon/pvcd/zkhandler.py +++ b/node-daemon/pvcd/zkhandler.py @@ -37,7 +37,7 @@ def deletekey(zk_conn, key, recursive=True): # Data read function def readdata(zk_conn, key): data_raw = zk_conn.get(key) - data = data_raw[0].decode('ascii') + data = data_raw[0].decode('utf8') meta = data_raw[1] return data @@ -55,7 +55,7 @@ def writedata(zk_conn, kv): # Check if this key already exists or not if not zk_conn.exists(key): # We're creating a new key - zk_transaction.create(key, str(data).encode('ascii')) + zk_transaction.create(key, str(data).encode('utf8')) else: # We're updating a key with version validation orig_data = zk_conn.get(key) @@ -65,7 +65,7 @@ def writedata(zk_conn, kv): new_version = version + 1 # Update the data - zk_transaction.set_data(key, str(data).encode('ascii')) + zk_transaction.set_data(key, str(data).encode('utf8')) # Set up the check try: