diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 714819e9..2c8dac62 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -4533,6 +4533,76 @@ class API_VM_Autobackup_Root(Resource): api.add_resource(API_VM_Autobackup_Root, "/vm/autobackup") +# /vm/automirror +class API_VM_Automirror_Root(Resource): + @RequestParser( + [ + {"name": "email_recipients"}, + { + "name": "email_errors_only", + "required": False, + }, + ] + ) + @Authenticator + def post(self, reqargs): + """ + Trigger a cluster automirror job + --- + tags: + - provisioner + parameters: + - in: query + name: email_recipients + type: string + required: false + description: A list of email addresses to send failure and report emails to, comma-separated + - in: query + name: email_errors_only + type: boolean + required: false + default: false + description: If set and true, only sends a report email to email_recipients when there is an error with at least one mirror + responses: + 202: + description: Accepted + schema: + type: object + description: The Celery job information of the task + id: CeleryTask + 400: + description: Bad request + schema: + type: object + id: Message + """ + + email_recipients = reqargs.get("email_recipients", None) + if email_recipients is not None and not isinstance(email_recipients, list): + email_recipients = [email_recipients] + + email_errors_only = bool(strtobool(reqargs.get("email_errors_only", "False"))) + + task = run_celery_task( + "cluster.automirror", + email_recipients=email_recipients, + email_errors_only=email_errors_only, + run_on="primary", + ) + return ( + { + "task_id": task.id, + "task_name": "cluster.automirror", + "run_on": f"{get_primary_node()} (primary)", + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) + + +api.add_resource(API_VM_Automirror_Root, "/vm/automirror") + + ########################################################## # Client API - Network ########################################################## diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index 4eae588d..97347058 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -2577,6 +2577,89 @@ def cli_vm_autobackup(email_report, force_full_flag, wait_flag, cron_flag): finish(retcode, retmsg) +############################################################################### +# > pvc vm automirror +############################################################################### +@click.command( + name="automirror", short_help="Perform automatic virtual machine mirrors." +) +@connection_req +@click.option( + "--email-report", + "email_report", + default=None, + help="Email a mirror summary report to the specified address(es), comma-separated.", +) +@click.option( + "--email-errors-only", + "email_errors_only_flag", + is_flag=True, + default=False, + show_default=True, + help="Only send a mirror summary report when at least one error occurrs.", +) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress if waiting.", +) +@click.option( + "--cron", + "cron_flag", + is_flag=True, + default=False, + show_default=True, + help="Run in cron mode (returns immediately with no output once job is submitted).", +) +def cli_vm_automirror(email_report, email_errors_only_flag, wait_flag, cron_flag): + """ + Perform automated mirrors of VMs, with integrated cleanup and full/incremental scheduling. + + This command enables automatic mirrors of PVC VMs at the block level, leveraging the various "pvc vm snapshot" + functions with an internal rentention and cleanup system. VMs and the destination cluster(s) are selected based + on configured VM tags and a set of static configs in the cluster's `pvc.conf` configuration. + + This command should be run from cron or a timer at a regular interval (e.g. daily, hourly, etc.) which defines + how often mirrors are taken. Mirror retention is based only on the number of recorded mirrors on the remote side, + not on the time interval between them. Mirrors taken manually outside of the "automirror" command are not counted + towards the format or retention of automirrors. + + WARNING: Running this command manually will interfere with the schedule! Do not run manually except for testing. + + The actual details of the automirror, including retention policies, are defined in the main PVC configuration file + `/etc/pvc/pvc.conf`. See the sample configuration for more details. + + An optional report on all current mirrors can be emailed to one or more email addresses using the + "--email-report" flag. This report will include information on all current known mirrors. + """ + + if cron_flag: + wait_flag = False + + if email_report is not None: + email_recipients = email_report.split(",") + else: + email_recipients = None + + retcode, retmsg = pvc.lib.vm.vm_automirror( + CLI_CONFIG, + email_recipients=email_recipients, + email_errors_only_flag=email_errors_only_flag, + wait_flag=wait_flag, + ) + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) + + if cron_flag: + finish(retcode, None) + else: + finish(retcode, retmsg) + + ############################################################################### # > pvc vm tag ############################################################################### @@ -6918,6 +7001,7 @@ cli_vm_backup.add_command(cli_vm_backup_restore) cli_vm_backup.add_command(cli_vm_backup_remove) cli_vm.add_command(cli_vm_backup) cli_vm.add_command(cli_vm_autobackup) +cli_vm.add_command(cli_vm_automirror) cli_vm_tag.add_command(cli_vm_tag_get) cli_vm_tag.add_command(cli_vm_tag_add) cli_vm_tag.add_command(cli_vm_tag_remove) diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index e72ae889..8430a27f 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -714,6 +714,26 @@ def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_fla return get_wait_retdata(response, wait_flag) +def vm_automirror( + config, email_recipients=None, email_errors_only_flag=False, wait_flag=True +): + """ + Perform a cluster VM automirror + + API endpoint: POST /vm//automirror + API arguments: email_recipients=email_recipients, email_errors_only=email_errors_only_flag + API schema: {"message":"{data}"} + """ + params = { + "email_recipients": email_recipients, + "email_errors_only": email_errors_only_flag, + } + + response = call_api(config, "post", "/vm/automirror", params=params) + + return get_wait_retdata(response, wait_flag) + + def vm_vcpus_set(config, vm, vcpus, topology, restart): """ Set the vCPU count of the VM with topology diff --git a/daemon-common/automirror.py b/daemon-common/automirror.py new file mode 100644 index 00000000..36533fdd --- /dev/null +++ b/daemon-common/automirror.py @@ -0,0 +1,496 @@ +#!/usr/bin/env python3 + +# automirror.py - PVC API Automirror functions +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2024 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import requests + +from datetime import datetime +from os import popen + +from daemon_lib.config import get_automirror_configuration +from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish + +import daemon_lib.vm as vm + + +def send_execution_failure_report( + celery, config, recipients=None, total_time=0, error=None +): + if recipients is None: + return + + from email.utils import formatdate + from socket import gethostname + + log_message = f"Sending email failure report to {', '.join(recipients)}" + log_info(celery, log_message) + + current_datetime = datetime.now() + email_datetime = formatdate(float(current_datetime.strftime("%s"))) + + email = list() + email.append(f"Date: {email_datetime}") + email.append( + f"Subject: PVC Automirror execution failure for cluster '{config['cluster']}'" + ) + + email_to = list() + for recipient in recipients: + email_to.append(f"<{recipient}>") + + email.append(f"To: {', '.join(email_to)}") + email.append(f"From: PVC Automirror System ") + email.append("") + + email.append( + f"A PVC automirror has FAILED at {current_datetime} in {total_time}s due to an execution error." + ) + email.append("") + email.append("The reported error message is:") + email.append(f" {error}") + + try: + with popen("/usr/sbin/sendmail -t", "w") as p: + p.write("\n".join(email)) + except Exception as e: + log_err(f"Failed to send report email: {e}") + + +def send_execution_summary_report( + celery, + config, + recipients=None, + total_time=0, + summary=dict(), + local_deleted_snapshots=dict(), +): + if recipients is None: + return + + from email.utils import formatdate + from socket import gethostname + + log_message = f"Sending email summary report to {', '.join(recipients)}" + log_info(celery, log_message) + + current_datetime = datetime.now() + email_datetime = formatdate(float(current_datetime.strftime("%s"))) + + email = list() + email.append(f"Date: {email_datetime}") + email.append(f"Subject: PVC Automirror report for cluster '{config['cluster']}'") + + email_to = list() + for recipient in recipients: + email_to.append(f"<{recipient}>") + + email.append(f"To: {', '.join(email_to)}") + email.append(f"From: PVC Automirror System ") + email.append("") + + email.append( + f"A PVC automirror has been completed at {current_datetime} in {total_time}." + ) + email.append("") + email.append( + "The following is a summary of all VM mirror jobs executed during this run:" + ) + email.append("") + + vm_names = {k.split(":")[0] for k in summary.keys()} + for vm_name in vm_names: + email.append(f"VM {vm_name}:") + email.append(" Mirror jobs:") + for destination_name in { + k.split(":")[1] for k in summary.keys() if k.split(":")[0] == vm_name + }: + mirror = summary[f"{vm_name}:{destination_name}"] + datestring = mirror.get("snapshot_name").replace("am", "") + mirror_date = datetime.strptime(datestring, "%Y%m%d%H%M%S") + if mirror.get("result", False): + email.append( + f" * {mirror_date}: Success to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}" + ) + else: + email.append( + f" * {mirror_date}: Failure to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}" + ) + email.append( + f" {mirror.get('result_message')}" + ) + + email.append( + " The following aged-out local snapshots were removed during cleanup:" + ) + for snapshot in local_deleted_snapshots[vm_name]: + email.append(f" * {snapshot}") + + try: + with popen("/usr/sbin/sendmail -t", "w") as p: + p.write("\n".join(email)) + except Exception as e: + log_err(f"Failed to send report email: {e}") + + +def run_vm_mirror( + zkhandler, celery, config, vm_detail, snapshot_name, destination_name +): + vm_name = vm_detail["name"] + keep_count = config["mirror_keep_snapshots"] + + try: + destination = config["mirror_destinations"][destination_name] + except Exception: + error_message = f"Failed to find valid destination cluster '{destination_name}' for VM '{vm_name}'" + log_err(celery, error_message) + return error_message + + destination_api_uri = f"{'https' if destination['ssl'] else 'http'}://{destination['address']}:{destination['port']}{destination['prefix']}" + destination_api_timeout = (3.05, 172800) + destination_api_headers = { + "X-Api-Key": destination["key"], + } + + session = requests.Session() + session.headers.update(destination_api_headers) + session.verify = destination["verify_ssl"] + session.timeout = destination_api_timeout + + # Get the last snapshot that is on the remote side for incrementals + response = session.get( + f"{destination_api_uri}/vm/{vm_name}", + params=None, + data=None, + ) + destination_vm_detail = response.json() + if type(destination_vm_detail) is list and len(destination_vm_detail) > 0: + destination_vm_detail = destination_vm_detail[0] + try: + last_snapshot_name = [ + s + for s in destination_vm_detail["snapshots"] + if s["name"].startswith("am") + ][0]["name"] + except Exception: + last_snapshot_name = None + else: + last_snapshot_name = None + + # Send the current snapshot + result, message = vm.vm_worker_send_snapshot( + zkhandler, + None, + vm_name, + snapshot_name, + destination_api_uri, + destination["key"], + destination_api_verify_ssl=destination["verify_ssl"], + incremental_parent=last_snapshot_name, + destination_storage_pool=destination["pool"], + return_status=True, + ) + + if not result: + return False, message + + response = session.get( + f"{destination_api_uri}/vm/{vm_name}", + params=None, + data=None, + ) + destination_vm_detail = response.json() + if type(destination_vm_detail) is list and len(destination_vm_detail) > 0: + destination_vm_detail = destination_vm_detail[0] + else: + message = "Remote VM somehow does not exist after successful mirror; skipping snapshot cleanup" + return False, message + + # Find any mirror snapshots that are expired + remote_snapshots = [ + s for s in destination_vm_detail["snapshots"] if s["name"].startswith("am") + ] + + # Snapshots are in dated descending order due to the names + if len(remote_snapshots) > keep_count: + remote_marked_for_deletion = [s["name"] for s in remote_snapshots[keep_count:]] + else: + remote_marked_for_deletion = list() + + for snapshot in remote_marked_for_deletion: + log_info( + celery, + f"VM {vm_detail['name']} removing stale remote automirror snapshot {snapshot}", + ) + session.delete( + f"{destination_api_uri}/vm/{vm_name}/snapshot", + params={ + "snapshot_name": snapshot, + }, + data=None, + ) + + session.close() + + return True, remote_marked_for_deletion + + +def worker_cluster_automirror( + zkhandler, + celery, + force_full=False, + email_recipients=None, + email_errors_only=False, +): + config = get_automirror_configuration() + + mirror_summary = dict() + local_deleted_snapshots = dict() + + current_stage = 0 + total_stages = 1 + + start( + celery, + f"Starting cluster '{config['cluster']}' VM automirror", + current=current_stage, + total=total_stages, + ) + + if not config["automirror_enabled"]: + message = "Automirrors are not configured on this cluster." + log_info(celery, message) + return finish( + celery, + message, + current=total_stages, + total=total_stages, + ) + + if email_recipients is not None: + total_stages += 1 + + automirror_start_time = datetime.now() + + retcode, vm_list = vm.get_list(zkhandler) + if not retcode: + error_message = f"Failed to fetch VM list: {vm_list}" + log_err(celery, error_message) + current_stage += 1 + send_execution_failure_report( + celery, + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + mirror_vms = list() + for vm_detail in vm_list: + mirror_vm = { + "detail": vm_detail, + "destinations": list(), + } + vm_tag_names = [t["name"] for t in vm_detail["tags"]] + # Check if any of the mirror tags are present; if they are, then we should mirror + vm_mirror_tags = list() + for tag in vm_tag_names: + if tag.split(":")[0] in config["mirror_tags"]: + vm_mirror_tags.append(tag) + + # There are no mirror tags, so skip this VM + if len(vm_mirror_tags) < 1: + continue + + # Go through each tag to extract the cluster + target_clusters = set() + for tag in vm_mirror_tags: + if len(tag.split(":")) == 1: + # This is a direct match without any cluster suffix, so use the default + target_clusters.add(config["mirror_default_destination"]) + if len(tag.split(":")) > 1: + # This has a cluster suffix, so use that + target_clusters.add(tag.split(":")[1]) + + for cluster in target_clusters: + mirror_vm["destinations"].append(cluster) + + mirror_vms.append(mirror_vm) + + if len(mirror_vms) < 1: + message = "Found no VMs tagged for automirror." + log_info(celery, message) + return finish( + celery, + message, + current=total_stages, + total=total_stages, + ) + + total_stages += len(mirror_vms) + + mirror_vm_names = set([b["detail"]["name"] for b in mirror_vms]) + + log_info( + celery, + f"Found {len(mirror_vm_names)} suitable VM(s) for automirror: {', '.join(mirror_vm_names)}", + ) + + # Execute the backup: take a snapshot, then export the snapshot + for mirror_vm in mirror_vms: + vm_detail = mirror_vm["detail"] + vm_destinations = mirror_vm["destinations"] + + current_stage += 1 + update( + celery, + f"Performing automirror of VM {vm_detail['name']}", + current=current_stage, + total=total_stages, + ) + + # Automirrors use a custom name to allow them to be properly cleaned up later + now = datetime.now() + datestring = now.strftime("%Y%m%d%H%M%S") + snapshot_name = f"am{datestring}" + + result, message = vm.vm_worker_create_snapshot( + zkhandler, + None, + vm_detail["name"], + snapshot_name=snapshot_name, + return_status=True, + ) + if not result: + for destination in vm_destinations: + mirror_summary[f"{vm_detail['name']}:{destination}"] = { + "result": result, + "snapshot_name": snapshot_name, + "runtime_secs": 0, + "result_message": message, + } + continue + + remote_marked_for_deletion = dict() + for destination in vm_destinations: + mirror_start = datetime.now() + result, ret = run_vm_mirror( + zkhandler, + celery, + config, + vm_detail, + snapshot_name, + destination, + ) + mirror_end = datetime.now() + runtime_secs = (mirror_end - mirror_start).seconds + if result: + remote_marked_for_deletion[destination] = ret + + mirror_summary[f"{vm_detail['name']}:{destination}"] = { + "result": result, + "snapshot_name": snapshot_name, + "runtime_secs": runtime_secs, + } + else: + log_warn( + celery, + f"Error in mirror send: {ret}", + ) + mirror_summary[f"{vm_detail['name']}:{destination}"] = { + "result": result, + "snapshot_name": snapshot_name, + "runtime_secs": runtime_secs, + "result_message": ret, + } + + # Find all local snapshots that were present in all remote snapshot deletions, + # then remove them + # If one of the sends fails, this should result in nothing being removed + if remote_marked_for_deletion: + all_lists = [set(lst) for lst in remote_marked_for_deletion.values() if lst] + if all_lists: + local_marked_for_deletion = set.intersection(*all_lists) + else: + local_marked_for_deletion = set() + else: + local_marked_for_deletion = set() + + for snapshot in local_marked_for_deletion: + log_info( + celery, + f"VM {vm_detail['name']} removing stale local automirror snapshot {snapshot}", + ) + vm.vm_worker_remove_snapshot( + zkhandler, + None, + vm_detail["name"], + snapshot, + ) + + local_deleted_snapshots[vm_detail["name"]] = local_marked_for_deletion + + automirror_end_time = datetime.now() + automirror_total_time = automirror_end_time - automirror_start_time + + if email_recipients is not None: + current_stage += 1 + if email_errors_only and not all( + [s["result"] for _, s in mirror_summary.items()] + ): + # Send report if we're in errors only and at least one send failed + send_report = True + elif not email_errors_only: + # Send report if we're not in errors only + send_report = True + else: + # Otherwise (errors only and all successful) don't send + send_report = False + + if send_report: + update( + celery, + "Sending automirror results summary email", + current=current_stage, + total=total_stages, + ) + send_execution_summary_report( + celery, + config, + recipients=email_recipients, + total_time=automirror_total_time, + summary=mirror_summary, + local_deleted_snapshots=local_deleted_snapshots, + ) + else: + update( + celery, + "Skipping automirror results summary email (no failures)", + current=current_stage, + total=total_stages, + ) + + current_stage += 1 + return finish( + celery, + f"Successfully completed cluster '{config['cluster']}' VM automirror", + current=current_stage, + total=total_stages, + ) diff --git a/daemon-common/config.py b/daemon-common/config.py index 4024f997..4c17718d 100644 --- a/daemon-common/config.py +++ b/daemon-common/config.py @@ -481,6 +481,64 @@ def get_autobackup_configuration(): return config +def get_parsed_automirror_configuration(config_file): + """ + Load the configuration; this is the same main pvc.conf that the daemons read + """ + print('Loading configuration from file "{}"'.format(config_file)) + + with open(config_file, "r") as cfgfh: + try: + o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader) + except Exception as e: + print(f"ERROR: Failed to parse configuration file: {e}") + os._exit(1) + + config = dict() + + try: + o_cluster = o_config["cluster"] + config_cluster = { + "cluster": o_cluster["name"], + "automirror_enabled": True, + } + config = {**config, **config_cluster} + + o_automirror = o_config["automirror"] + if o_automirror is None: + config["automirror_enabled"] = False + return config + + config_automirror = { + "mirror_tags": o_automirror["mirror_tags"], + "mirror_destinations": o_automirror["destinations"], + "mirror_default_destination": o_automirror["default_destination"], + "mirror_keep_snapshots": o_automirror["keep_snapshots"], + } + config = {**config, **config_automirror} + + if config["mirror_default_destination"] not in [ + d for d in config["mirror_destinations"].keys() + ]: + raise Exception( + "Specified default mirror destination is not in the list of destinations" + ) + + except Exception as e: + raise MalformedConfigurationError(e) + + return config + + +def get_automirror_configuration(): + """ + Get the configuration. + """ + pvc_config_file = get_configuration_path() + config = get_parsed_automirror_configuration(pvc_config_file) + return config + + def validate_directories(config): if not os.path.exists(config["dynamic_directory"]): os.makedirs(config["dynamic_directory"]) diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 69e4d5f7..cc729912 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -2107,6 +2107,7 @@ def vm_worker_create_snapshot( domain, snapshot_name=None, zk_only=False, + return_status=False, ): if snapshot_name is None: now = datetime.now() @@ -2124,27 +2125,34 @@ def vm_worker_create_snapshot( # Validate that VM exists in cluster dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: - fail( - celery, - f"Could not find VM '{domain}' in the cluster", - ) - return False + message = (f"Could not find VM '{domain}' in the cluster",) + fail(celery, message) + if return_status: + return False, message + else: + return False reg = re.compile("^[a-z0-9.-_]+$") if not reg.match(snapshot_name): - fail( - celery, + message = ( "Snapshot name '{snapshot_name}' contains invalid characters; only alphanumeric, '.', '-', and '_' characters are allowed", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid)) if current_snapshots and snapshot_name in current_snapshots: - fail( - celery, + message = ( f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False # Get the list of all RBD volumes rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") @@ -2178,11 +2186,12 @@ def vm_worker_create_snapshot( ) if not ret: cleanup_failure() - fail( - celery, - msg.replace("ERROR: ", ""), - ) - return False + message = (msg.replace("ERROR: ", ""),) + fail(celery, message) + if return_status: + return False, message + else: + return False else: snap_list.append(f"{pool}/{volume}@{snapshot_name}") @@ -2242,12 +2251,22 @@ def vm_worker_create_snapshot( ) current_stage += 1 - return finish( - celery, - f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'", - current=current_stage, - total=total_stages, - ) + message = (f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",) + if return_status: + finish( + celery, + message, + current=current_stage, + total=total_stages, + ) + return True, message + else: + return finish( + celery, + message, + current=current_stage, + total=total_stages, + ) def vm_worker_remove_snapshot( @@ -3157,6 +3176,7 @@ def vm_worker_send_snapshot( destination_api_verify_ssl=True, incremental_parent=None, destination_storage_pool=None, + return_status=False, ): current_stage = 0 @@ -3171,11 +3191,12 @@ def vm_worker_send_snapshot( # Validate that VM exists in cluster dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: - fail( - celery, - f"Could not find VM '{domain}' in the cluster", - ) - return False + message = (f"Could not find VM '{domain}' in the cluster",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Get our side's VM configuration details try: @@ -3184,31 +3205,34 @@ def vm_worker_send_snapshot( vm_detail = None if not isinstance(vm_detail, dict): - fail( - celery, - f"VM listing returned invalid data: {vm_detail}", - ) - return False + message = (f"VM listing returned invalid data: {vm_detail}",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Check if the snapshot exists if not zkhandler.exists( ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) ): - fail( - celery, - f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", - ) - return False + message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Check if the incremental parent exists if incremental_parent is not None and not zkhandler.exists( ("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent) ): - fail( - celery, - f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", - ) - return False + message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",) + fail(celery, message) + if return_status: + return False, message + else: + return False vm_name = vm_detail["name"] @@ -3234,23 +3258,26 @@ def vm_worker_send_snapshot( if "PVC API" not in response.json().get("message"): raise ValueError("Remote API is not a PVC API or incorrect URI given") except requests.exceptions.ConnectionError as e: - fail( - celery, - f"Connection to remote API timed out: {e}", - ) - return False + message = (f"Connection to remote API timed out: {e}",) + fail(celery, message) + if return_status: + return False, message + else: + return False except ValueError as e: - fail( - celery, - f"Connection to remote API is not valid: {e}", - ) - return False + message = (f"Connection to remote API is not valid: {e}",) + fail(celery, message) + if return_status: + return False, message + else: + return False except Exception as e: - fail( - celery, - f"Connection to remote API failed: {e}", - ) - return False + message = (f"Connection to remote API failed: {e}",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Hit the API "/status" endpoint to validate API key and cluster status response = session.get( @@ -3263,11 +3290,14 @@ def vm_worker_send_snapshot( "pvc_version", None ) if current_destination_pvc_version is None: - fail( - celery, + message = ( "Connection to remote API failed: no PVC version information returned", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False expected_destination_pvc_version = "0.9.101" # Work around development versions @@ -3278,11 +3308,14 @@ def vm_worker_send_snapshot( if parse_version(current_destination_pvc_version) < parse_version( expected_destination_pvc_version ): - fail( - celery, + message = ( f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False # Check if the VM already exists on the remote response = session.get( @@ -3301,11 +3334,12 @@ def vm_worker_send_snapshot( current_destination_vm_state is not None and current_destination_vm_state != "mirror" ): - fail( - celery, - "Remote PVC VM exists and is not a mirror", - ) - return False + message = ("Remote PVC VM exists and is not a mirror",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Get details about VM snapshot _, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many( @@ -3351,31 +3385,38 @@ def vm_worker_send_snapshot( # Check if this snapshot is in the remote list already if snapshot_name in [s["name"] for s in destination_vm_snapshots]: - fail( - celery, - f"Snapshot {snapshot_name} already exists on the target", - ) - return False + message = (f"Snapshot {snapshot_name} already exists on the target",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Check if this snapshot is older than the latest remote VM snapshot if ( len(destination_vm_snapshots) > 0 and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"] ): - fail( - celery, + message = ( f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False # Check that our incremental parent exists on the remote VM if incremental_parent is not None: if incremental_parent not in [s["name"] for s in destination_vm_snapshots]: - fail( - celery, + message = ( f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False # Begin send, set stages total_stages += 1 + (3 * len(snapshot_rbdsnaps)) @@ -3393,6 +3434,25 @@ def vm_worker_send_snapshot( "source_snapshot": incremental_parent, } + # Strip out autobackup and automirror tags + # These should never be wanted on the receiving side + from daemon_lib.config import ( + get_autobackup_configuration, + get_automirror_configuration, + ) + + autobackup_config = get_autobackup_configuration() + automirror_config = get_automirror_configuration() + new_tags = list() + for tag in vm_detail["tags"]: + tag_base = tag["name"].split(":")[0] + if tag_base in [t for t in autobackup_config["backup_tags"]] or tag_base in [ + t for t in automirror_config["mirror_tags"] + ]: + continue + new_tags.append(tag) + vm_detail["tags"] = new_tags + response = session.post( f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", headers={"Content-Type": "application/json"}, @@ -3400,11 +3460,12 @@ def vm_worker_send_snapshot( json=vm_detail, ) if response.status_code != 200: - fail( - celery, - f"Failed to send config: {response.json()['message']}", - ) - return False + message = (f"Failed to send config: {response.json()['message']}",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Create the block devices on the remote side if this is a new VM send block_t_start = time.time() @@ -3431,11 +3492,12 @@ def vm_worker_send_snapshot( error_message = f"Multiple details returned for volume {rbd_name}" else: error_message = f"Error getting details for volume {rbd_name}" - fail( - celery, - error_message, - ) - return False + message = (error_message,) + fail(celery, message) + if return_status: + return False, message + else: + return False try: local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) @@ -3460,11 +3522,12 @@ def vm_worker_send_snapshot( data=None, ) if response.status_code != 404 and current_destination_vm_state is None: - fail( - celery, - f"Remote storage pool {pool} already contains volume {volume}", - ) - return False + message = (f"Remote storage pool {pool} already contains volume {volume}",) + fail(celery, message) + if return_status: + return False, message + else: + return False if current_destination_vm_state is not None: try: @@ -3474,7 +3537,10 @@ def vm_worker_send_snapshot( except Exception as e: error_message = f"Failed to get volume size for remote {rbd_name}: {e}" fail(celery, error_message) - return False + if return_status: + return False, error_message + else: + return False if local_volume_size != remote_volume_size: response = session.put( @@ -3482,11 +3548,12 @@ def vm_worker_send_snapshot( params={"new_size": local_volume_size, "force": True}, ) if response.status_code != 200: - fail( - celery, - "Failed to resize remote volume to match local volume", - ) - return False + message = ("Failed to resize remote volume to match local volume",) + fail(celery, message) + if return_status: + return False, message + else: + return False # Send the volume to the remote cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") @@ -3557,11 +3624,14 @@ def vm_worker_send_snapshot( stream=True, ) if response.status_code != 200: - fail( - celery, + message = ( f"Failed to send diff batch: {response.json()['message']}", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False current_chunk_time = time.time() chunk_time = current_chunk_time - last_chunk_time @@ -3609,11 +3679,12 @@ def vm_worker_send_snapshot( buffer.clear() # Clear the buffer after sending buffer_size = 0 # Reset buffer size except Exception: - fail( - celery, - f"Failed to send snapshot: {response.json()['message']}", - ) - return False + message = (f"Failed to send snapshot: {response.json()['message']}",) + fail(celery, message) + if return_status: + return False, message + else: + return False finally: image.close() ioctx.close() @@ -3657,11 +3728,14 @@ def vm_worker_send_snapshot( data=full_chunker(), ) if response.status_code != 200: - fail( - celery, + message = ( f"Failed to send snapshot: {response.json()['message']}", ) - return False + fail(celery, message) + if return_status: + return False, message + else: + return False finally: image.close() ioctx.close() @@ -3678,11 +3752,12 @@ def vm_worker_send_snapshot( params=send_params, ) if response.status_code != 200: - fail( - celery, - f"Failed to send snapshot: {response.json()['message']}", - ) - return False + message = (f"Failed to send snapshot: {response.json()['message']}",) + fail(celery, message) + if return_status: + return False, message + else: + return False finally: image.close() ioctx.close() @@ -3692,12 +3767,24 @@ def vm_worker_send_snapshot( block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1) current_stage += 1 - return finish( - celery, + message = ( f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)", - current=current_stage, - total=total_stages, ) + if return_status: + finish( + celery, + message, + current=current_stage, + total=total_stages, + ) + return True, message + else: + return finish( + celery, + message, + current=current_stage, + total=total_stages, + ) def vm_worker_create_mirror( diff --git a/pvc.sample.conf b/pvc.sample.conf index ab2d5411..46af29f8 100644 --- a/pvc.sample.conf +++ b/pvc.sample.conf @@ -393,6 +393,8 @@ api: private_key: "" # Automatic backups +# If this section is present, autobackups will be enabled; otherwise, they will be disabled. +# The pvc-ansible roles manage this including the various timer units, so avoid adjusting this manually. autobackup: # Backup root path on the node, used as the remote mountpoint @@ -451,5 +453,55 @@ autobackup: # This example shows a generic umount leveraging the backup_root_path variable - "/usr/bin/umount {backup_root_path}" +# Automatic mirroring to peer clusters +# If this section is present, automirrors will be enabled; otherwise, they will be disabled. +# The pvc-ansible roles manage this including the various timer units, so avoid adjusting this manually. +automirror: + + # Destination clusters + # A list of destination cluster API endpoints to send mirrors to. + # For each entry, the "name" field will be mapped to the "{cluster}" variable in the tag(s) + # above. For more details on how exactly this works, please consult the documentation. + destinations: + + # An example entry; contains the same information as a "pvc connection" entry + # The key in this dictionary is the "name" of the cluster, which is what must be suffixed + # to a tag and is displayed in the report and status output. + cluster2: + # The destination address, either an IP or an FQDN the destination API is reachable at + address: pvc.cluster2.mydomain.tld + # The destination port (usually 7370) + port: 7370 + # The API prefix (usually '/api/v1') without a trailing slash + prefix: "/api/v1" + # The API key of the destination + key: 00000000-0000-0000-0000-000000000000 + # Whether or not to use SSL for the connection + ssl: yes + # Whether or not to verify SSL for the connection + verify_ssl: yes + # Storage pool for VMs on the destination + pool: vms + + # Default destination + # The cluster name to send mirrors to for VMs without an explicit "{cluster}" tag + # Always required, even if there is only a single destination + default_destination: cluster2 + + # VM tag(s) to mirror + # Only VMs with at least one of the given tag(s) will be mirrored; all others will be skipped + # All mirror tags support suffixing a ":{cluster}" argument, which will override the default + # cluster and send mirrors to the given cluster name (in the list below). Multiple suffixed + # tags are supported; if more than one is, the VM will be mirrored to all specified clusters. + mirror_tags: + - "automirror" + + # The number of snapshots to keep, on both sides - mirror snapshots older than the last + # X snapshots will be automatically removed to save space + # Depending on the interval specified in the pvc-ansible variables, this may be either a + # relatively short or relatively long time. + keep_snapshots: 7 + + # VIM modeline, requires "set modeline" in your VIMRC # vim: expandtab shiftwidth=2 tabstop=2 filetype=yaml diff --git a/worker-daemon/pvcworkerd/Daemon.py b/worker-daemon/pvcworkerd/Daemon.py index b347e770..9947f203 100755 --- a/worker-daemon/pvcworkerd/Daemon.py +++ b/worker-daemon/pvcworkerd/Daemon.py @@ -53,6 +53,9 @@ from daemon_lib.vmbuilder import ( from daemon_lib.autobackup import ( worker_cluster_autobackup, ) +from daemon_lib.automirror import ( + worker_cluster_automirror, +) # Daemon version version = "0.9.103" @@ -122,6 +125,26 @@ def cluster_autobackup(self, force_full=False, email_recipients=None, run_on="pr ) +@celery.task(name="cluster.automirror", bind=True, routing_key="run_on") +def cluster_automirror( + self, email_recipients=None, email_errors_only=False, run_on="primary" +): + @ZKConnection(config) + def run_cluster_automirror( + zkhandler, self, email_recipients=None, email_errors_only=False + ): + return worker_cluster_automirror( + zkhandler, + self, + email_recipients=email_recipients, + email_errors_only=email_errors_only, + ) + + return run_cluster_automirror( + self, email_recipients=email_recipients, email_errors_only=email_errors_only + ) + + @celery.task(name="vm.flush_locks", bind=True, routing_key="run_on") def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"): @ZKConnection(config)