diff --git a/daemon-common/autobackup.py b/daemon-common/autobackup.py index 017103e8..e86be7b9 100644 --- a/daemon-common/autobackup.py +++ b/daemon-common/autobackup.py @@ -22,15 +22,17 @@ from datetime import datetime from json import load as jload from json import dump as jdump -from os import popen, makedirs, path +from os import popen, makedirs, path, scandir from shutil import rmtree from subprocess import run, PIPE +from time import time +from daemon_lib.common import run_os_command from daemon_lib.config import get_autobackup_configuration from daemon_lib.celery import start, fail, log_info, log_err, update, finish -import daemon_lib.ceph as pvc_ceph -import daemon_lib.vm as pvc_vm +import daemon_lib.ceph as ceph +import daemon_lib.vm as vm def send_execution_failure_report( @@ -43,7 +45,7 @@ def send_execution_failure_report( from socket import gethostname log_message = f"Sending email failure report to {', '.join(recipients)}" - log_info(log_message) + log_info(celery_conf[0], log_message) update( celery_conf[0], log_message, @@ -92,7 +94,7 @@ def send_execution_summary_report( from socket import gethostname log_message = f"Sending email summary report to {', '.join(recipients)}" - log_info(log_message) + log_info(celery_conf[0], log_message) update( celery_conf[0], log_message, @@ -124,9 +126,9 @@ def send_execution_summary_report( ) email.append("") - for vm in summary.keys(): - email.append(f"VM: {vm}:") - for backup in summary[vm]: + for vm_name in summary.keys(): + email.append(f"VM: {vm_name}:") + for backup in summary[vm_name]: datestring = backup.get("datestring") backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S") if backup.get("result", False): @@ -134,7 +136,7 @@ def send_execution_summary_report( f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" ) email.append( - f" Backup contains {len(backup.get('backup_files'))} files totaling {pvc_ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)" + f" Backup contains {len(backup.get('backup_files'))} files totaling {ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)" ) else: email.append( @@ -180,7 +182,7 @@ def worker_cluster_autobackup( autobackup_start_time = datetime.now() - retcode, vm_list = pvc_vm.get_list(zkhandler) + retcode, vm_list = vm.get_list(zkhandler) if not retcode: error_message = f"Failed to fetch VM list: {vm_list}" log_err(celery, error_message) @@ -193,16 +195,25 @@ def worker_cluster_autobackup( fail(celery, error_message) return False + backup_suffixed_path = ( + f"{config['backup_root_path']}/{config['backup_root_suffix']}" + ) + if not path.exists(backup_suffixed_path): + makedirs(backup_suffixed_path) + + full_interval = config["backup_schedule"]["full_interval"] + full_retention = config["backup_schedule"]["full_retention"] + backup_vms = list() - for vm in vm_list: - vm_tag_names = [t["name"] for t in vm["tags"]] + for vm_detail in vm_list: + vm_tag_names = [t["name"] for t in vm_detail["tags"]] matching_tags = ( True if len(set(vm_tag_names).intersection(set(config["backup_tags"]))) > 0 else False ) if matching_tags: - backup_vms.append(vm) + backup_vms.append(vm_detail) if len(backup_vms) < 1: message = "Found no VMs tagged for autobackup." @@ -217,13 +228,40 @@ def worker_cluster_autobackup( if config["auto_mount_enabled"]: total_stages += len(config["mount_cmds"]) total_stages += len(config["unmount_cmds"]) - for vm in backup_vms: - total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"]) - total_stages += 2 + 1 + 2 + 2 + 3 * total_disks + + for vm_detail in backup_vms: + total_disks = len([d for d in vm_detail["disks"] if d["type"] == "rbd"]) + total_stages += 2 + (2 * total_disks) + + vm_name = vm_detail["name"] + vm_backup_path = f"{backup_suffixed_path}/{vm_name}" + autobackup_state_file = f"{vm_backup_path}/.autobackup.json" + if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): + # There are no existing backups so the list is empty + state_data = dict() + tracked_backups = list() + else: + with open(autobackup_state_file) as fh: + state_data = jload(fh) + tracked_backups = state_data["tracked_backups"] + + full_backups = [b for b in tracked_backups if b["type"] == "full"] + if len(full_backups) > 0: + last_full_backup = full_backups[0] + last_full_backup_idx = tracked_backups.index(last_full_backup) + if force_full or last_full_backup_idx >= full_interval - 1: + this_backup_retain_snapshot = True + else: + this_backup_retain_snapshot = False + else: + # The very first ackup must be full to start the tree + this_backup_retain_snapshot = True + if this_backup_retain_snapshot: + total_stages += total_disks log_info( celery, - f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join(vm_list)}", + f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join([b['name'] for b in backup_vms])}", ) # Handle automount mount commands @@ -257,17 +295,9 @@ def worker_cluster_autobackup( return False # Execute the backup: take a snapshot, then export the snapshot - backup_suffixed_path = ( - f"{config['backup_root_path']}/{config['backup_root_suffix']}" - ) - if not path.exists(backup_suffixed_path): - makedirs(backup_suffixed_path) - - full_interval = config["backup_schedule"]["full_interval"] - full_retention = config["backup_schedule"]["full_retention"] - - for vm in backup_vms: - vm_name = vm["name"] + for vm_detail in backup_vms: + vm_name = vm_detail["name"] + dom_uuid = vm_detail["uuid"] vm_backup_path = f"{backup_suffixed_path}/{vm_name}" autobackup_state_file = f"{vm_backup_path}/.autobackup.json" if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): @@ -301,60 +331,35 @@ def worker_cluster_autobackup( datestring = now.strftime("%Y%m%d%H%M%S") snapshot_name = f"autobackup_{datestring}" - # Take the snapshot - ret = pvc_vm.vm_worker_create_snapshot( - zkhandler, - celery, - vm_name, - snapshot_name=snapshot_name, - override_current_stage=current_stage, - override_total_stages=total_stages, - ) - if ret is False: - error_message = f"Failed to create backup snapshot '{snapshot_name}'" - log_err(celery, error_message) - send_execution_failure_report( - (celery, current_stage, total_stages), - config, - recipients=email_recipients, - error=error_message, - ) - return False + # Take the VM snapshot (vm.vm_worker_create_snapshot) + snap_list = list() - # Export the snapshot - ret = pvc_vm.vm_worker_export_snapshot( - zkhandler, - celery, - vm_name, - snapshot_name, - backup_suffixed_path, - incremental_parent=this_backup_incremental_parent, - override_current_stage=current_stage, - override_total_stages=total_stages, - ) - if ret is False: - error_message = f"Failed to export backup snapshot '{snapshot_name}'" - log_err(celery, error_message) - send_execution_failure_report( - (celery, current_stage, total_stages), - config, - recipients=email_recipients, - error=error_message, - ) - return False + def cleanup_failure(): + for snapshot in snap_list: + rbd, snapshot_name = snapshot.split("@") + pool, volume = rbd.split("/") + # We capture no output here, because if this fails too we're in a deep + # error chain and will just ignore it + ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) - # Clean up the snapshot - if not this_backup_retain_snapshot: - ret = pvc_vm.vm_worker_remove_snapshot( - zkhandler, + rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") + + for rbd in rbd_list: + current_stage += 1 + update( celery, - vm_name, - snapshot_name, - override_current_stage=current_stage, - override_total_stages=total_stages, + f"[VM {vm_name}] Creating RBD snapshot of {rbd}", + current=current_stage, + total=total_stages, ) - if ret is False: - error_message = f"Failed to remove backup snapshot '{snapshot_name}'" + + pool, volume = rbd.split("/") + ret, msg = ceph.add_snapshot( + zkhandler, pool, volume, snapshot_name, zk_only=False + ) + if not ret: + cleanup_failure() + error_message = msg.replace("ERROR: ", "") log_err(celery, error_message) send_execution_failure_report( (celery, current_stage, total_stages), @@ -362,10 +367,274 @@ def worker_cluster_autobackup( recipients=email_recipients, error=error_message, ) + fail(celery, error_message) return False + else: + snap_list.append(f"{pool}/{volume}@{snapshot_name}") + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Creating VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + # Get the current timestamp + tstart = time() + + # Get the current domain XML + vm_config = zkhandler.read(("domain.xml", dom_uuid)) + + # Add the snapshot entry to Zookeeper + zkhandler.write( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ), + snapshot_name, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ), + tstart, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ), + vm_config, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ), + ",".join(snap_list), + ), + ] + ) + + # Export the snapshot (vm.vm_worker_export_snapshot) + export_target_path = f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/images" + + try: + makedirs(export_target_path) + except Exception as e: + error_message = ( + f"[VM {vm_name}] Failed to create target directory '{export_target_path}': {e}", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + def export_cleanup(): + from shutil import rmtree + + rmtree(f"{backup_suffixed_path}/{vm_name}/{snapshot_name}") + + export_type = ( + "incremental" if this_backup_incremental_parent is not None else "full" + ) + + # Set the export filetype + if this_backup_incremental_parent is not None: + export_fileext = "rbddiff" else: - total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"]) - current_stage += 2 + total_disks + export_fileext = "rbdimg" + + snapshot_volumes = list() + for rbdsnap in snap_list: + pool, _volume = rbdsnap.split("/") + volume, name = _volume.split("@") + ret, snapshots = ceph.get_list_snapshot( + zkhandler, pool, volume, limit=name, is_fuzzy=False + ) + if ret: + snapshot_volumes += snapshots + + export_files = list() + for snapshot_volume in snapshot_volumes: + snap_pool = snapshot_volume["pool"] + snap_volume = snapshot_volume["volume"] + snap_snapshot_name = snapshot_volume["snapshot"] + snap_size = snapshot_volume["stats"]["size"] + snap_str = f"{snap_pool}/{snap_volume}@{snap_snapshot_name}" + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Exporting {snap_str}", + current=current_stage, + total=total_stages, + ) + + if this_backup_incremental_parent is not None: + retcode, stdout, stderr = run_os_command( + f"rbd export-diff --from-snap {this_backup_incremental_parent} {snap_pool}/{snap_volume}@{snap_snapshot_name} {export_target_path}/{snap_pool}.{snap_volume}.{export_fileext}" + ) + if retcode: + error_message = ( + f"[VM {vm_name}] Failed to export snapshot for volume(s) '{snap_pool}/{snap_volume}'", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + else: + export_files.append( + ( + f"images/{snap_pool}.{snap_volume}.{export_fileext}", + snap_size, + ) + ) + else: + retcode, stdout, stderr = run_os_command( + f"rbd export --export-format 2 {snap_pool}/{snap_volume}@{snap_snapshot_name} {export_target_path}/{snap_pool}.{snap_volume}.{export_fileext}" + ) + if retcode: + error_message = ( + f"[VM {vm_name}] Failed to export snapshot for volume(s) '{snap_pool}/{snap_volume}'", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + else: + export_files.append( + ( + f"images/{snap_pool}.{snap_volume}.{export_fileext}", + snap_size, + ) + ) + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Writing snapshot details", + current=current_stage, + total=total_stages, + ) + + def get_dir_size(pathname): + total = 0 + with scandir(pathname) as it: + for entry in it: + if entry.is_file(): + total += entry.stat().st_size + elif entry.is_dir(): + total += get_dir_size(entry.path) + return total + + export_files_size = get_dir_size(export_target_path) + + export_details = { + "type": export_type, + "datestring": datestring, + "snapshot_name": snapshot_name, + "incremental_parent": this_backup_incremental_parent, + "vm_detail": vm_detail, + "export_files": export_files, + "export_size_bytes": export_files_size, + } + try: + with open( + f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/snapshot.json", "w" + ) as fh: + jdump(export_details, fh) + except Exception as e: + error_message = ( + f"[VM {vm_name}] Failed to export configuration snapshot: {e}", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + # Clean up the snapshot (vm.vm_worker_remove_snapshot) + if not this_backup_retain_snapshot: + for snap in snap_list: + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Removing {snap}", + current=current_stage, + total=total_stages, + ) + + rbd, name = snap.split("@") + pool, volume = rbd.split("/") + ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name) + if not ret: + error_message = msg.replace("ERROR: ", "") + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False + + current_stage += 1 + update( + celery, + f"[VM {vm_name}] Removing VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + ret = zkhandler.delete( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ) + if not ret: + error_message = ( + f"[VM {vm_name}] Failed to remove snapshot from Zookeeper", + ) + log_err(celery, error_message) + send_execution_failure_report( + (celery, current_stage, total_stages), + config, + recipients=email_recipients, + error=error_message, + ) + fail(celery, error_message) + return False current_stage += 1 update( @@ -406,7 +675,7 @@ def worker_cluster_autobackup( ) for backup_to_delete in marked_for_deletion: - ret = pvc_vm.vm_worker_remove_snapshot( + ret = vm.vm_worker_remove_snapshot( zkhandler, None, vm_name, backup_to_delete["snapshot_name"] ) if ret is False: @@ -427,7 +696,7 @@ def worker_cluster_autobackup( with open(autobackup_state_file, "w") as fh: jdump(state_data, fh) - backup_summary[vm] = tracked_backups + backup_summary[vm_detail["name"]] = tracked_backups # Handle automount unmount commands if config["auto_mount_enabled"]: