From f57b8d4a1531b4033c9f26f40a9fb3f635d44efb Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sun, 25 Aug 2024 21:41:52 -0400 Subject: [PATCH] Simplify Celery event handling It was far too cumbersome to report every possible stage here in a consistent way. Realistically, this command will be run silently from cron 99.95% of the time, so all this overcomplexity to handle individual Celery state updates just isn't worth it. --- client-cli/pvc/cli/cli.py | 4 +- daemon-common/autobackup.py | 237 +++++++++--------------------------- 2 files changed, 63 insertions(+), 178 deletions(-) diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index feebf952..a1aa71e4 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -2190,9 +2190,11 @@ def cli_vm_autobackup(email_report, force_full_flag, wait_flag, cron_flag): This command should be run from cron or a timer at a regular interval (e.g. daily, hourly, etc.) which defines how often backups are taken. Backup format (full/incremental) and retention is based only on the number of - recorded backups, not on the time interval between them. Backups taken manually outside of the "autobackup" + recorded backups, not on the time interval between them. Exports taken manually outside of the "autobackup" command are not counted towards the format or retention of autobackups. + WARNING: Running this command manually will interfere with the schedule! Do not run manually except for testing. + The actual details of the autobackup, including retention policies, full-vs-incremental, pre- and post- run mounting/unmounting commands, etc. are defined in the main PVC configuration file `/etc/pvc/pvc.conf`. See the sample configuration for more details. diff --git a/daemon-common/autobackup.py b/daemon-common/autobackup.py index dae46e12..07463b9c 100644 --- a/daemon-common/autobackup.py +++ b/daemon-common/autobackup.py @@ -132,14 +132,14 @@ def send_execution_summary_report( backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S") if backup.get("result", False): email.append( - f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" + f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {backup.get('snapshot_name')}, type {backup.get('type', 'unknown')}" ) email.append( f" Backup contains {len(backup.get('export_files'))} files totaling {ceph.format_bytes_tohuman(backup.get('export_size_bytes', 0))} ({backup.get('export_size_bytes', 0)} bytes)" ) else: email.append( - f" {backup_date}: Failure in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" + f" {backup_date}: Failure in {backup.get('runtime_secs', 0)} seconds, ID {backup.get('snapshot_name')}, type {backup.get('type', 'unknown')}" ) email.append(f" {backup.get('result_message')}") @@ -150,14 +150,10 @@ def send_execution_summary_report( log_err(f"Failed to send report email: {e}") -def run_vm_backup( - zkhandler, celery, current_stage, total_stages, config, vm_detail, force_full=False -): +def run_vm_backup(zkhandler, celery, config, vm_detail, force_full=False): vm_name = vm_detail["name"] dom_uuid = vm_detail["uuid"] - backup_suffixed_path = ( - f"{config['backup_root_path']}/{config['backup_root_suffix']}" - ) + backup_suffixed_path = f"{config['backup_root_path']}{config['backup_root_suffix']}" vm_backup_path = f"{backup_suffixed_path}/{vm_name}" autobackup_state_file = f"{vm_backup_path}/.autobackup.json" full_interval = config["backup_schedule"]["full_interval"] @@ -183,7 +179,7 @@ def run_vm_backup( this_backup_incremental_parent = None this_backup_retain_snapshot = True else: - this_backup_incremental_parent = last_full_backup["datestring"] + this_backup_incremental_parent = last_full_backup["snapshot_name"] this_backup_retain_snapshot = False else: # The very first ackup must be full to start the tree @@ -210,9 +206,13 @@ def run_vm_backup( backup_json_file = ( f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/snapshot.json" ) - with open(backup_json_file) as fh: - backup_json = jload(fh) - tracked_backups.insert(0, backup_json) + try: + with open(backup_json_file) as fh: + backup_json = jload(fh) + tracked_backups.insert(0, backup_json) + except Exception as e: + log_err(celery, f"Could not open export JSON: {e}") + return list() state_data["tracked_backups"] = tracked_backups with open(autobackup_state_file, "w") as fh: @@ -255,22 +255,8 @@ def run_vm_backup( ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") - final_stage = ( - current_stage - + 5 - + len(rbd_list) - + (len(rbd_list) if this_backup_retain_snapshot else 0) - ) for rbd in rbd_list: - current_stage += 1 - update( - celery, - f"[{vm_name}] Creating RBD snapshot of {rbd}", - current=current_stage, - total=total_stages, - ) - pool, volume = rbd.split("/") ret, msg = ceph.add_snapshot( zkhandler, pool, volume, snapshot_name, zk_only=False @@ -286,24 +272,9 @@ def run_vm_backup( if failure: error_message = (f"[{vm_name}] Error in snapshot export, skipping",) - current_stage = final_stage write_backup_summary(message=error_message) tracked_backups = update_tracked_backups() - update( - celery, - error_message, - current=current_stage, - total=total_stages, - ) - return tracked_backups, current_stage - - current_stage += 1 - update( - celery, - f"[{vm_name}] Creating VM configuration snapshot", - current=current_stage, - total=total_stages, - ) + return tracked_backups # Get the current domain XML vm_config = zkhandler.read(("domain.xml", dom_uuid)) @@ -351,17 +322,10 @@ def run_vm_backup( ) if not ret: error_message = (f"[{vm_name}] Error in snapshot export, skipping",) - current_stage = final_stage log_err(celery, error_message) write_backup_summary(message=error_message) tracked_backups = update_tracked_backups() - update( - celery, - error_message, - current=current_stage, - total=total_stages, - ) - return tracked_backups, current_stage + return tracked_backups # Export the snapshot (vm.vm_worker_export_snapshot) export_target_path = f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/images" @@ -372,17 +336,8 @@ def run_vm_backup( error_message = ( f"[{vm_name}] Failed to create target directory '{export_target_path}': {e}", ) - current_stage = final_stage log_err(celery, error_message) - write_backup_summary(message=error_message) - tracked_backups = update_tracked_backups() - update( - celery, - f"[{vm_name}] Error in snapshot export, skipping", - current=current_stage, - total=total_stages, - ) - return tracked_backups, current_stage + return tracked_backups def export_cleanup(): from shutil import rmtree @@ -411,15 +366,6 @@ def run_vm_backup( snap_volume = snapshot_volume["volume"] snap_snapshot_name = snapshot_volume["snapshot"] snap_size = snapshot_volume["stats"]["size"] - snap_str = f"{snap_pool}/{snap_volume}@{snap_snapshot_name}" - - current_stage += 1 - update( - celery, - f"[{vm_name}] Exporting RBD snapshot {snap_str}", - current=current_stage, - total=total_stages, - ) if this_backup_incremental_parent is not None: retcode, stdout, stderr = run_os_command( @@ -457,25 +403,10 @@ def run_vm_backup( ) if failure: - current_stage = final_stage log_err(celery, error_message) write_backup_summary(message=error_message) tracked_backups = update_tracked_backups() - update( - celery, - error_message, - current=current_stage, - total=total_stages, - ) - return tracked_backups, current_stage - - current_stage += 1 - update( - celery, - f"[{vm_name}] Writing snapshot details", - current=current_stage, - total=total_stages, - ) + return tracked_backups def get_dir_size(pathname): total = 0 @@ -492,29 +423,14 @@ def run_vm_backup( ret, e = write_backup_summary(success=True) if not ret: error_message = (f"[{vm_name}] Failed to export configuration snapshot: {e}",) - current_stage = final_stage log_err(celery, error_message) write_backup_summary(message=error_message) tracked_backups = update_tracked_backups() - update( - celery, - error_message, - current=current_stage, - total=total_stages, - ) - return tracked_backups, current_stage + return tracked_backups # Clean up the snapshot (vm.vm_worker_remove_snapshot) if not this_backup_retain_snapshot: for snap in snap_list: - current_stage += 1 - update( - celery, - f"[{vm_name}] Removing RBD snapshot {snap}", - current=current_stage, - total=total_stages, - ) - rbd, name = snap.split("@") pool, volume = rbd.split("/") ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name) @@ -524,25 +440,10 @@ def run_vm_backup( break if failure: - current_stage = final_stage log_err(celery, error_message) write_backup_summary(message=error_message) tracked_backups = update_tracked_backups() - update( - celery, - error_message, - current=current_stage, - total=total_stages, - ) - return tracked_backups, current_stage - - current_stage += 1 - update( - celery, - f"[{vm_name}] Removing VM configuration snapshot", - current=current_stage, - total=total_stages, - ) + return tracked_backups ret = zkhandler.delete( ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) @@ -551,14 +452,6 @@ def run_vm_backup( error_message = (f"[{vm_name}] Failed to remove VM snapshot; continuing",) log_err(celery, error_message) - current_stage += 1 - update( - celery, - f"Finding obsolete incremental backups for '{vm_name}'", - current=current_stage, - total=total_stages, - ) - marked_for_deletion = list() # Find any full backups that are expired found_full_count = 0 @@ -570,19 +463,11 @@ def run_vm_backup( # Find any incremental backups that depend on marked parents for backup in tracked_backups: if backup["type"] == "incremental" and backup["incremental_parent"] in [ - b["datestring"] for b in marked_for_deletion + b["snapshot_name"] for b in marked_for_deletion ]: marked_for_deletion.append(backup) - current_stage += 1 if len(marked_for_deletion) > 0: - update( - celery, - f"Cleaning up aged out backups for '{vm_name}'", - current=current_stage, - total=total_stages, - ) - for backup_to_delete in marked_for_deletion: ret = vm.vm_worker_remove_snapshot( zkhandler, None, vm_name, backup_to_delete["snapshot_name"] @@ -594,15 +479,8 @@ def run_vm_backup( rmtree(f"{vm_backup_path}/{backup_to_delete['snapshot_name']}") tracked_backups.remove(backup_to_delete) - current_stage += 1 - update( - celery, - "Updating tracked backups", - current=current_stage, - total=total_stages, - ) tracked_backups = update_tracked_backups() - return tracked_backups, current_stage + return tracked_backups def worker_cluster_autobackup( @@ -649,9 +527,7 @@ def worker_cluster_autobackup( fail(celery, error_message) return False - backup_suffixed_path = ( - f"{config['backup_root_path']}/{config['backup_root_suffix']}" - ) + backup_suffixed_path = f"{config['backup_root_path']}{config['backup_root_suffix']}" if not path.exists(backup_suffixed_path): makedirs(backup_suffixed_path) @@ -682,35 +558,7 @@ def worker_cluster_autobackup( total_stages += len(config["mount_cmds"]) total_stages += len(config["unmount_cmds"]) - for vm_detail in backup_vms: - total_disks = len([d for d in vm_detail["disks"] if d["type"] == "rbd"]) - total_stages += 2 + (2 * total_disks) - - vm_name = vm_detail["name"] - vm_backup_path = f"{backup_suffixed_path}/{vm_name}" - autobackup_state_file = f"{vm_backup_path}/.autobackup.json" - if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): - # There are no existing backups so the list is empty - state_data = dict() - tracked_backups = list() - else: - with open(autobackup_state_file) as fh: - state_data = jload(fh) - tracked_backups = state_data["tracked_backups"] - - full_backups = [b for b in tracked_backups if b["type"] == "full"] - if len(full_backups) > 0: - last_full_backup = full_backups[0] - last_full_backup_idx = tracked_backups.index(last_full_backup) - if force_full or last_full_backup_idx >= full_interval - 1: - this_backup_retain_snapshot = True - else: - this_backup_retain_snapshot = False - else: - # The very first ackup must be full to start the tree - this_backup_retain_snapshot = True - if this_backup_retain_snapshot: - total_stages += total_disks + total_stages += len(backup_vms) log_info( celery, @@ -749,11 +597,46 @@ def worker_cluster_autobackup( # Execute the backup: take a snapshot, then export the snapshot for vm_detail in backup_vms: - summary, current_stage = run_vm_backup( + vm_backup_path = f"{backup_suffixed_path}/{vm_detail['name']}" + autobackup_state_file = f"{vm_backup_path}/.autobackup.json" + if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): + # There are no existing backups so the list is empty + state_data = dict() + tracked_backups = list() + else: + with open(autobackup_state_file) as fh: + state_data = jload(fh) + tracked_backups = state_data["tracked_backups"] + + full_backups = [b for b in tracked_backups if b["type"] == "full"] + if len(full_backups) > 0: + last_full_backup = full_backups[0] + last_full_backup_idx = tracked_backups.index(last_full_backup) + if force_full: + this_backup_incremental_parent = None + elif last_full_backup_idx >= full_interval - 1: + this_backup_incremental_parent = None + else: + this_backup_incremental_parent = last_full_backup["snapshot_name"] + else: + # The very first ackup must be full to start the tree + this_backup_incremental_parent = None + + export_type = ( + "incremental" if this_backup_incremental_parent is not None else "full" + ) + + current_stage += 1 + update( + celery, + f"Performing autobackup of VM {vm_detail['name']} ({export_type})", + current=current_stage, + total=total_stages, + ) + + summary = run_vm_backup( zkhandler, celery, - current_stage, - total_stages, config, vm_detail, force_full=force_full,