From f1668bffcc0e96d55fcb6016affd72c77c10348b Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sun, 25 Aug 2024 15:18:30 -0400 Subject: [PATCH] Refactor autobackups to implement vm.worker defs Avoid trying to subcall other Celery worker tasks, as this just gets very screwy with the stages. Instead reimplement what is needed directly here. While this does cause a fair bit of code duplication, I believe the resulting clarity is worthwhile. --- daemon-common/autobackup.py | 433 +++++++++++++++++++++++++++++------- 1 file changed, 351 insertions(+), 82 deletions(-) diff --git a/daemon-common/autobackup.py b/daemon-common/autobackup.py index 017103e8..e86be7b9 100644 --- a/daemon-common/autobackup.py +++ b/daemon-common/autobackup.py @@ -22,15 +22,17 @@ from datetime import datetime from json import load as jload from json import dump as jdump -from os import popen, makedirs, path +from os import popen, makedirs, path, scandir from shutil import rmtree from subprocess import run, PIPE +from time import time +from daemon_lib.common import run_os_command from daemon_lib.config import get_autobackup_configuration from daemon_lib.celery import start, fail, log_info, log_err, update, finish -import daemon_lib.ceph as pvc_ceph -import daemon_lib.vm as pvc_vm +import daemon_lib.ceph as ceph +import daemon_lib.vm as vm def send_execution_failure_report( @@ -43,7 +45,7 @@ def send_execution_failure_report( from socket import gethostname log_message = f"Sending email failure report to {', '.join(recipients)}" - log_info(log_message) + log_info(celery_conf[0], log_message) update( celery_conf[0], log_message, @@ -92,7 +94,7 @@ def send_execution_summary_report( from socket import gethostname log_message = f"Sending email summary report to {', '.join(recipients)}" - log_info(log_message) + log_info(celery_conf[0], log_message) update( celery_conf[0], log_message, @@ -124,9 +126,9 @@ def send_execution_summary_report( ) email.append("") - for vm in summary.keys(): - email.append(f"VM: {vm}:") - for backup in summary[vm]: + for vm_name in summary.keys(): + email.append(f"VM: {vm_name}:") + for backup in summary[vm_name]: datestring = backup.get("datestring") backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S") if backup.get("result", False): @@ -134,7 +136,7 @@ def send_execution_summary_report( f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" ) email.append( - f" Backup contains {len(backup.get('backup_files'))} files totaling {pvc_ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)" + f" Backup contains {len(backup.get('backup_files'))} files totaling {ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)" ) else: email.append( @@ -180,7 +182,7 @@ def worker_cluster_autobackup( autobackup_start_time = datetime.now() - retcode, vm_list = pvc_vm.get_list(zkhandler) + retcode, vm_list = vm.get_list(zkhandler) if not retcode: error_message = f"Failed to fetch VM list: {vm_list}" log_err(celery, error_message) @@ -193,16 +195,25 @@ def worker_cluster_autobackup( fail(celery, error_message) return False + backup_suffixed_path = ( + f"{config['backup_root_path']}/{config['backup_root_suffix']}" + ) + if not path.exists(backup_suffixed_path): + makedirs(backup_suffixed_path) + + full_interval = config["backup_schedule"]["full_interval"] + full_retention = config["backup_schedule"]["full_retention"] + backup_vms = list() - for vm in vm_list: - vm_tag_names = [t["name"] for t in vm["tags"]] + for vm_detail in vm_list: + vm_tag_names = [t["name"] for t in vm_detail["tags"]] matching_tags = ( True if len(set(vm_tag_names).intersection(set(config["backup_tags"]))) > 0 else False ) if matching_tags: - backup_vms.append(vm) + backup_vms.append(vm_detail) if len(backup_vms) < 1: message = "Found no VMs tagged for autobackup." @@ -217,13 +228,40 @@ def worker_cluster_autobackup( if config["auto_mount_enabled"]: total_stages += len(config["mount_cmds"]) total_stages += len(config["unmount_cmds"]) - for vm in backup_vms: - total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"]) - total_stages += 2 + 1 + 2 + 2 + 3 * total_disks + + for vm_detail in backup_vms: + total_disks = len([d for d in vm_detail["disks"] if d["type"] == "rbd"]) + total_stages += 2 + (2 * total_disks) + + vm_name = vm_detail["name"] + vm_backup_path = f"{backup_suffixed_path}/{vm_name}" + autobackup_state_file = f"{vm_backup_path}/.autobackup.json" + if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): + # There are no existing backups so the list is empty + state_data = dict() + tracked_backups = list() + else: + with open(autobackup_state_file) as fh: + state_data = jload(fh) + tracked_backups = state_data["tracked_backups"] + + full_backups = [b for b in tracked_backups if b["type"] == "full"] + if len(full_backups) > 0: + last_full_backup = full_backups[0] + last_full_backup_idx = tracked_backups.index(last_full_backup) + if force_full or last_full_backup_idx >= full_interval - 1: + this_backup_retain_snapshot = True + else: + this_backup_retain_snapshot = False + else: + # The very first ackup must be full to start the tree + this_backup_retain_snapshot = True + if this_backup_retain_snapshot: + total_stages += total_disks log_info( celery, - f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join(vm_list)}", + f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join([b['name'] for b in backup_vms])}", ) # Handle automount mount commands @@ -257,17 +295,9 @@ def worker_cluster_autobackup( return False # Execute the backup: take a snapshot, then export the snapshot - backup_suffixed_path = ( - f"{config['backup_root_path']}/{config['backup_root_suffix']}" - ) - if not path.exists(backup_suffixed_path): - makedirs(backup_suffixed_path) - - full_interval = config["backup_schedule"]["full_interval"] - full_retention = config["backup_schedule"]["full_retention"] - - for vm in backup_vms: - vm_name = vm["name"] + for vm_detail in backup_vms: + vm_name = vm_detail["name"] + dom_uuid = vm_detail["uuid"] vm_backup_path = f"{backup_suffixed_path}/{vm_name}" autobackup_state_file = f"{vm_backup_path}/.autobackup.json" if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): @@ -301,60 +331,35 @@ def worker_cluster_autobackup( datestring = now.strftime("%Y%m%d%H%M%S") snapshot_name = f"autobackup_{datestring}" - # Take the snapshot - ret = pvc_vm.vm_worker_create_snapshot( - zkhandler, - celery, - vm_name, - snapshot_name=snapshot_name, - override_current_stage=current_stage, - override_total_stages=total_stages, - ) - if ret is False: - error_message = f"Failed to create backup snapshot '{snapshot_name}'" - log_err(celery, error_message) - send_execution_failure_report( - (celery, current_stage, total_stages), - config, - recipients=email_recipients, - error=error_message, - ) - return False + # Take the VM snapshot (vm.vm_worker_create_snapshot) + snap_list = list() - # Export the snapshot - ret = pvc_vm.vm_worker_export_snapshot( - zkhandler, - celery, - vm_name, - snapshot_name, - backup_suffixed_path, - incremental_parent=this_backup_incremental_parent, - override_current_stage=current_stage, - override_total_stages=total_stages, - ) - if ret is False: - error_message = f"Failed to export backup snapshot '{snapshot_name}'" - log_err(celery, error_message) - send_execution_failure_report( - (celery, current_stage, total_stages), - config, - recipients=email_recipients, - error=error_message, - ) - return False + def cleanup_failure(): + for snapshot in snap_list: + rbd, snapshot_name = snapshot.split("@") + pool, volume = rbd.split("/") + # We capture no output here, because if this fails too we're in a deep + # error chain and will just ignore it + ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) - # Clean up the snapshot - if not this_backup_retain_snapshot: - ret = pvc_vm.vm_worker_remove_snapshot( - zkhandler, + rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") + + for rbd in rbd_list: + current_stage += 1 + update( celery, - vm_name, - snapshot_name, - override_current_stage=current_stage, - override_total_stages=total_stages, + f"[VM {vm_name}] Creating RBD snapshot of {rbd}", + current=current_stage, + total=total_stages, ) - if ret is False: - error_message = f"Failed to remove backup snapshot '{snapshot_name}'" + + pool, volume = rbd.split("/") + ret, msg = ceph.add_snapshot( + zkhandler, pool, volume, snapshot_name, zk_only=False + ) + if not ret: + cleanup_failure() + error_message = msg.replace("ERROR: ", "") log_err(celery, error_message) send_execution_failure_report( (celery, current_stage, total_stages), @@ -362,10 +367,274 @@ def worker_cluster_autobackup( recipients=email_recipients, error=error_message, ) + fail(celery, error_message) return False + else: + snap_list.append(f"{pool}/{volume}@{snapshot_name}") + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Creating VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + # Get the current timestamp + tstart = time() + + # Get the current domain XML + vm_config = zkhandler.read(("domain.xml", dom_uuid)) + + # Add the snapshot entry to Zookeeper + zkhandler.write( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ), + snapshot_name, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ), + tstart, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ), + vm_config, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ), + ",".join(snap_list), + ), + ] + ) + + # Export the snapshot (vm.vm_worker_export_snapshot) + export_target_path = f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/images" + + try: + makedirs(export_target_path) + except Exception as e: + error_message = ( + f"[VM {vm_name}] Failed to create target directory '{export_target_path}': {e}", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + def export_cleanup(): + from shutil import rmtree + + rmtree(f"{backup_suffixed_path}/{vm_name}/{snapshot_name}") + + export_type = ( + "incremental" if this_backup_incremental_parent is not None else "full" + ) + + # Set the export filetype + if this_backup_incremental_parent is not None: + export_fileext = "rbddiff" else: - total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"]) - current_stage += 2 + total_disks + export_fileext = "rbdimg" + + snapshot_volumes = list() + for rbdsnap in snap_list: + pool, _volume = rbdsnap.split("/") + volume, name = _volume.split("@") + ret, snapshots = ceph.get_list_snapshot( + zkhandler, pool, volume, limit=name, is_fuzzy=False + ) + if ret: + snapshot_volumes += snapshots + + export_files = list() + for snapshot_volume in snapshot_volumes: + snap_pool = snapshot_volume["pool"] + snap_volume = snapshot_volume["volume"] + snap_snapshot_name = snapshot_volume["snapshot"] + snap_size = snapshot_volume["stats"]["size"] + snap_str = f"{snap_pool}/{snap_volume}@{snap_snapshot_name}" + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Exporting {snap_str}", + current=current_stage, + total=total_stages, + ) + + if this_backup_incremental_parent is not None: + retcode, stdout, stderr = run_os_command( + f"rbd export-diff --from-snap {this_backup_incremental_parent} {snap_pool}/{snap_volume}@{snap_snapshot_name} {export_target_path}/{snap_pool}.{snap_volume}.{export_fileext}" + ) + if retcode: + error_message = ( + f"[VM {vm_name}] Failed to export snapshot for volume(s) '{snap_pool}/{snap_volume}'", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + else: + export_files.append( + ( + f"images/{snap_pool}.{snap_volume}.{export_fileext}", + snap_size, + ) + ) + else: + retcode, stdout, stderr = run_os_command( + f"rbd export --export-format 2 {snap_pool}/{snap_volume}@{snap_snapshot_name} {export_target_path}/{snap_pool}.{snap_volume}.{export_fileext}" + ) + if retcode: + error_message = ( + f"[VM {vm_name}] Failed to export snapshot for volume(s) '{snap_pool}/{snap_volume}'", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + else: + export_files.append( + ( + f"images/{snap_pool}.{snap_volume}.{export_fileext}", + snap_size, + ) + ) + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Writing snapshot details", + current=current_stage, + total=total_stages, + ) + + def get_dir_size(pathname): + total = 0 + with scandir(pathname) as it: + for entry in it: + if entry.is_file(): + total += entry.stat().st_size + elif entry.is_dir(): + total += get_dir_size(entry.path) + return total + + export_files_size = get_dir_size(export_target_path) + + export_details = { + "type": export_type, + "datestring": datestring, + "snapshot_name": snapshot_name, + "incremental_parent": this_backup_incremental_parent, + "vm_detail": vm_detail, + "export_files": export_files, + "export_size_bytes": export_files_size, + } + try: + with open( + f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/snapshot.json", "w" + ) as fh: + jdump(export_details, fh) + except Exception as e: + error_message = ( + f"[VM {vm_name}] Failed to export configuration snapshot: {e}", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + # Clean up the snapshot (vm.vm_worker_remove_snapshot) + if not this_backup_retain_snapshot: + for snap in snap_list: + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Removing {snap}", + current=current_stage, + total=total_stages, + ) + + rbd, name = snap.split("@") + pool, volume = rbd.split("/") + ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name) + if not ret: + error_message = msg.replace("ERROR: ", "") + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Removing VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + ret = zkhandler.delete( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ) + if not ret: + error_message = ( + f"[VM {vm_name}] Failed to remove snapshot from Zookeeper", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False current_stage += 1 update( @@ -406,7 +675,7 @@ def worker_cluster_autobackup( ) for backup_to_delete in marked_for_deletion: - ret = pvc_vm.vm_worker_remove_snapshot( + ret = vm.vm_worker_remove_snapshot( zkhandler, None, vm_name, backup_to_delete["snapshot_name"] ) if ret is False: @@ -427,7 +696,7 @@ def worker_cluster_autobackup( with open(autobackup_state_file, "w") as fh: jdump(state_data, fh) - backup_summary[vm] = tracked_backups + backup_summary[vm_detail["name"]] = tracked_backups # Handle automount unmount commands if config["auto_mount_enabled"]: