Refactor autobackups to implement vm.worker defs

Avoid trying to subcall other Celery worker tasks, as this just gets
very screwy with the stages. Instead reimplement what is needed directly
here. While this does cause a fair bit of code duplication, I believe
the resulting clarity is worthwhile.
This commit is contained in:
Joshua Boniface 2024-08-25 15:18:30 -04:00
parent c0686fc5c7
commit f1668bffcc
1 changed files with 351 additions and 82 deletions

View File

@ -22,15 +22,17 @@
from datetime import datetime from datetime import datetime
from json import load as jload from json import load as jload
from json import dump as jdump from json import dump as jdump
from os import popen, makedirs, path from os import popen, makedirs, path, scandir
from shutil import rmtree from shutil import rmtree
from subprocess import run, PIPE from subprocess import run, PIPE
from time import time
from daemon_lib.common import run_os_command
from daemon_lib.config import get_autobackup_configuration from daemon_lib.config import get_autobackup_configuration
from daemon_lib.celery import start, fail, log_info, log_err, update, finish from daemon_lib.celery import start, fail, log_info, log_err, update, finish
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as ceph
import daemon_lib.vm as pvc_vm import daemon_lib.vm as vm
def send_execution_failure_report( def send_execution_failure_report(
@ -43,7 +45,7 @@ def send_execution_failure_report(
from socket import gethostname from socket import gethostname
log_message = f"Sending email failure report to {', '.join(recipients)}" log_message = f"Sending email failure report to {', '.join(recipients)}"
log_info(log_message) log_info(celery_conf[0], log_message)
update( update(
celery_conf[0], celery_conf[0],
log_message, log_message,
@ -92,7 +94,7 @@ def send_execution_summary_report(
from socket import gethostname from socket import gethostname
log_message = f"Sending email summary report to {', '.join(recipients)}" log_message = f"Sending email summary report to {', '.join(recipients)}"
log_info(log_message) log_info(celery_conf[0], log_message)
update( update(
celery_conf[0], celery_conf[0],
log_message, log_message,
@ -124,9 +126,9 @@ def send_execution_summary_report(
) )
email.append("") email.append("")
for vm in summary.keys(): for vm_name in summary.keys():
email.append(f"VM: {vm}:") email.append(f"VM: {vm_name}:")
for backup in summary[vm]: for backup in summary[vm_name]:
datestring = backup.get("datestring") datestring = backup.get("datestring")
backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S") backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S")
if backup.get("result", False): if backup.get("result", False):
@ -134,7 +136,7 @@ def send_execution_summary_report(
f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}"
) )
email.append( email.append(
f" Backup contains {len(backup.get('backup_files'))} files totaling {pvc_ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)" f" Backup contains {len(backup.get('backup_files'))} files totaling {ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)"
) )
else: else:
email.append( email.append(
@ -180,7 +182,7 @@ def worker_cluster_autobackup(
autobackup_start_time = datetime.now() autobackup_start_time = datetime.now()
retcode, vm_list = pvc_vm.get_list(zkhandler) retcode, vm_list = vm.get_list(zkhandler)
if not retcode: if not retcode:
error_message = f"Failed to fetch VM list: {vm_list}" error_message = f"Failed to fetch VM list: {vm_list}"
log_err(celery, error_message) log_err(celery, error_message)
@ -193,16 +195,25 @@ def worker_cluster_autobackup(
fail(celery, error_message) fail(celery, error_message)
return False return False
backup_suffixed_path = (
f"{config['backup_root_path']}/{config['backup_root_suffix']}"
)
if not path.exists(backup_suffixed_path):
makedirs(backup_suffixed_path)
full_interval = config["backup_schedule"]["full_interval"]
full_retention = config["backup_schedule"]["full_retention"]
backup_vms = list() backup_vms = list()
for vm in vm_list: for vm_detail in vm_list:
vm_tag_names = [t["name"] for t in vm["tags"]] vm_tag_names = [t["name"] for t in vm_detail["tags"]]
matching_tags = ( matching_tags = (
True True
if len(set(vm_tag_names).intersection(set(config["backup_tags"]))) > 0 if len(set(vm_tag_names).intersection(set(config["backup_tags"]))) > 0
else False else False
) )
if matching_tags: if matching_tags:
backup_vms.append(vm) backup_vms.append(vm_detail)
if len(backup_vms) < 1: if len(backup_vms) < 1:
message = "Found no VMs tagged for autobackup." message = "Found no VMs tagged for autobackup."
@ -217,13 +228,40 @@ def worker_cluster_autobackup(
if config["auto_mount_enabled"]: if config["auto_mount_enabled"]:
total_stages += len(config["mount_cmds"]) total_stages += len(config["mount_cmds"])
total_stages += len(config["unmount_cmds"]) total_stages += len(config["unmount_cmds"])
for vm in backup_vms:
total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"]) for vm_detail in backup_vms:
total_stages += 2 + 1 + 2 + 2 + 3 * total_disks total_disks = len([d for d in vm_detail["disks"] if d["type"] == "rbd"])
total_stages += 2 + (2 * total_disks)
vm_name = vm_detail["name"]
vm_backup_path = f"{backup_suffixed_path}/{vm_name}"
autobackup_state_file = f"{vm_backup_path}/.autobackup.json"
if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file):
# There are no existing backups so the list is empty
state_data = dict()
tracked_backups = list()
else:
with open(autobackup_state_file) as fh:
state_data = jload(fh)
tracked_backups = state_data["tracked_backups"]
full_backups = [b for b in tracked_backups if b["type"] == "full"]
if len(full_backups) > 0:
last_full_backup = full_backups[0]
last_full_backup_idx = tracked_backups.index(last_full_backup)
if force_full or last_full_backup_idx >= full_interval - 1:
this_backup_retain_snapshot = True
else:
this_backup_retain_snapshot = False
else:
# The very first ackup must be full to start the tree
this_backup_retain_snapshot = True
if this_backup_retain_snapshot:
total_stages += total_disks
log_info( log_info(
celery, celery,
f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join(vm_list)}", f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join([b['name'] for b in backup_vms])}",
) )
# Handle automount mount commands # Handle automount mount commands
@ -257,17 +295,9 @@ def worker_cluster_autobackup(
return False return False
# Execute the backup: take a snapshot, then export the snapshot # Execute the backup: take a snapshot, then export the snapshot
backup_suffixed_path = ( for vm_detail in backup_vms:
f"{config['backup_root_path']}/{config['backup_root_suffix']}" vm_name = vm_detail["name"]
) dom_uuid = vm_detail["uuid"]
if not path.exists(backup_suffixed_path):
makedirs(backup_suffixed_path)
full_interval = config["backup_schedule"]["full_interval"]
full_retention = config["backup_schedule"]["full_retention"]
for vm in backup_vms:
vm_name = vm["name"]
vm_backup_path = f"{backup_suffixed_path}/{vm_name}" vm_backup_path = f"{backup_suffixed_path}/{vm_name}"
autobackup_state_file = f"{vm_backup_path}/.autobackup.json" autobackup_state_file = f"{vm_backup_path}/.autobackup.json"
if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file): if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file):
@ -301,60 +331,35 @@ def worker_cluster_autobackup(
datestring = now.strftime("%Y%m%d%H%M%S") datestring = now.strftime("%Y%m%d%H%M%S")
snapshot_name = f"autobackup_{datestring}" snapshot_name = f"autobackup_{datestring}"
# Take the snapshot # Take the VM snapshot (vm.vm_worker_create_snapshot)
ret = pvc_vm.vm_worker_create_snapshot( snap_list = list()
zkhandler,
celery,
vm_name,
snapshot_name=snapshot_name,
override_current_stage=current_stage,
override_total_stages=total_stages,
)
if ret is False:
error_message = f"Failed to create backup snapshot '{snapshot_name}'"
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
return False
# Export the snapshot def cleanup_failure():
ret = pvc_vm.vm_worker_export_snapshot( for snapshot in snap_list:
zkhandler, rbd, snapshot_name = snapshot.split("@")
celery, pool, volume = rbd.split("/")
vm_name, # We capture no output here, because if this fails too we're in a deep
snapshot_name, # error chain and will just ignore it
backup_suffixed_path, ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name)
incremental_parent=this_backup_incremental_parent,
override_current_stage=current_stage,
override_total_stages=total_stages,
)
if ret is False:
error_message = f"Failed to export backup snapshot '{snapshot_name}'"
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
return False
# Clean up the snapshot rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
if not this_backup_retain_snapshot:
ret = pvc_vm.vm_worker_remove_snapshot( for rbd in rbd_list:
zkhandler, current_stage += 1
update(
celery, celery,
vm_name, f"[VM {vm_name}] Creating RBD snapshot of {rbd}",
snapshot_name, current=current_stage,
override_current_stage=current_stage, total=total_stages,
override_total_stages=total_stages,
) )
if ret is False:
error_message = f"Failed to remove backup snapshot '{snapshot_name}'" pool, volume = rbd.split("/")
ret, msg = ceph.add_snapshot(
zkhandler, pool, volume, snapshot_name, zk_only=False
)
if not ret:
cleanup_failure()
error_message = msg.replace("ERROR: ", "")
log_err(celery, error_message) log_err(celery, error_message)
send_execution_failure_report( send_execution_failure_report(
(celery, current_stage, total_stages), (celery, current_stage, total_stages),
@ -362,10 +367,274 @@ def worker_cluster_autobackup(
recipients=email_recipients, recipients=email_recipients,
error=error_message, error=error_message,
) )
fail(celery, error_message)
return False return False
else:
snap_list.append(f"{pool}/{volume}@{snapshot_name}")
current_stage += 1
update(
celery,
f"[VM {vm_name}] Creating VM configuration snapshot",
current=current_stage,
total=total_stages,
)
# Get the current timestamp
tstart = time()
# Get the current domain XML
vm_config = zkhandler.read(("domain.xml", dom_uuid))
# Add the snapshot entry to Zookeeper
zkhandler.write(
[
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.name",
snapshot_name,
),
snapshot_name,
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.timestamp",
snapshot_name,
),
tstart,
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.xml",
snapshot_name,
),
vm_config,
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.rbd_snapshots",
snapshot_name,
),
",".join(snap_list),
),
]
)
# Export the snapshot (vm.vm_worker_export_snapshot)
export_target_path = f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/images"
try:
makedirs(export_target_path)
except Exception as e:
error_message = (
f"[VM {vm_name}] Failed to create target directory '{export_target_path}': {e}",
)
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
def export_cleanup():
from shutil import rmtree
rmtree(f"{backup_suffixed_path}/{vm_name}/{snapshot_name}")
export_type = (
"incremental" if this_backup_incremental_parent is not None else "full"
)
# Set the export filetype
if this_backup_incremental_parent is not None:
export_fileext = "rbddiff"
else: else:
total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"]) export_fileext = "rbdimg"
current_stage += 2 + total_disks
snapshot_volumes = list()
for rbdsnap in snap_list:
pool, _volume = rbdsnap.split("/")
volume, name = _volume.split("@")
ret, snapshots = ceph.get_list_snapshot(
zkhandler, pool, volume, limit=name, is_fuzzy=False
)
if ret:
snapshot_volumes += snapshots
export_files = list()
for snapshot_volume in snapshot_volumes:
snap_pool = snapshot_volume["pool"]
snap_volume = snapshot_volume["volume"]
snap_snapshot_name = snapshot_volume["snapshot"]
snap_size = snapshot_volume["stats"]["size"]
snap_str = f"{snap_pool}/{snap_volume}@{snap_snapshot_name}"
current_stage += 1
update(
celery,
f"[VM {vm_name}] Exporting {snap_str}",
current=current_stage,
total=total_stages,
)
if this_backup_incremental_parent is not None:
retcode, stdout, stderr = run_os_command(
f"rbd export-diff --from-snap {this_backup_incremental_parent} {snap_pool}/{snap_volume}@{snap_snapshot_name} {export_target_path}/{snap_pool}.{snap_volume}.{export_fileext}"
)
if retcode:
error_message = (
f"[VM {vm_name}] Failed to export snapshot for volume(s) '{snap_pool}/{snap_volume}'",
)
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
else:
export_files.append(
(
f"images/{snap_pool}.{snap_volume}.{export_fileext}",
snap_size,
)
)
else:
retcode, stdout, stderr = run_os_command(
f"rbd export --export-format 2 {snap_pool}/{snap_volume}@{snap_snapshot_name} {export_target_path}/{snap_pool}.{snap_volume}.{export_fileext}"
)
if retcode:
error_message = (
f"[VM {vm_name}] Failed to export snapshot for volume(s) '{snap_pool}/{snap_volume}'",
)
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
else:
export_files.append(
(
f"images/{snap_pool}.{snap_volume}.{export_fileext}",
snap_size,
)
)
current_stage += 1
update(
celery,
f"[VM {vm_name}] Writing snapshot details",
current=current_stage,
total=total_stages,
)
def get_dir_size(pathname):
total = 0
with scandir(pathname) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_dir_size(entry.path)
return total
export_files_size = get_dir_size(export_target_path)
export_details = {
"type": export_type,
"datestring": datestring,
"snapshot_name": snapshot_name,
"incremental_parent": this_backup_incremental_parent,
"vm_detail": vm_detail,
"export_files": export_files,
"export_size_bytes": export_files_size,
}
try:
with open(
f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/snapshot.json", "w"
) as fh:
jdump(export_details, fh)
except Exception as e:
error_message = (
f"[VM {vm_name}] Failed to export configuration snapshot: {e}",
)
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
# Clean up the snapshot (vm.vm_worker_remove_snapshot)
if not this_backup_retain_snapshot:
for snap in snap_list:
current_stage += 1
update(
celery,
f"[VM {vm_name}] Removing {snap}",
current=current_stage,
total=total_stages,
)
rbd, name = snap.split("@")
pool, volume = rbd.split("/")
ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name)
if not ret:
error_message = msg.replace("ERROR: ", "")
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
current_stage += 1
update(
celery,
f"[VM {vm_name}] Removing VM configuration snapshot",
current=current_stage,
total=total_stages,
)
ret = zkhandler.delete(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
)
if not ret:
error_message = (
f"[VM {vm_name}] Failed to remove snapshot from Zookeeper",
)
log_err(celery, error_message)
send_execution_failure_report(
(celery, current_stage, total_stages),
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
current_stage += 1 current_stage += 1
update( update(
@ -406,7 +675,7 @@ def worker_cluster_autobackup(
) )
for backup_to_delete in marked_for_deletion: for backup_to_delete in marked_for_deletion:
ret = pvc_vm.vm_worker_remove_snapshot( ret = vm.vm_worker_remove_snapshot(
zkhandler, None, vm_name, backup_to_delete["snapshot_name"] zkhandler, None, vm_name, backup_to_delete["snapshot_name"]
) )
if ret is False: if ret is False:
@ -427,7 +696,7 @@ def worker_cluster_autobackup(
with open(autobackup_state_file, "w") as fh: with open(autobackup_state_file, "w") as fh:
jdump(state_data, fh) jdump(state_data, fh)
backup_summary[vm] = tracked_backups backup_summary[vm_detail["name"]] = tracked_backups
# Handle automount unmount commands # Handle automount unmount commands
if config["auto_mount_enabled"]: if config["auto_mount_enabled"]: