Simplify Celery event handling

It was far too cumbersome to report every possible stage here in a
consistent way. Realistically, this command will be run silently from
cron 99.95% of the time, so all this overcomplexity to handle individual
Celery state updates just isn't worth it.
This commit is contained in:
Joshua Boniface 2024-08-25 21:41:52 -04:00
parent 10de85cce3
commit f57b8d4a15
2 changed files with 63 additions and 178 deletions

View File

@ -2190,9 +2190,11 @@ def cli_vm_autobackup(email_report, force_full_flag, wait_flag, cron_flag):
This command should be run from cron or a timer at a regular interval (e.g. daily, hourly, etc.) which defines This command should be run from cron or a timer at a regular interval (e.g. daily, hourly, etc.) which defines
how often backups are taken. Backup format (full/incremental) and retention is based only on the number of how often backups are taken. Backup format (full/incremental) and retention is based only on the number of
recorded backups, not on the time interval between them. Backups taken manually outside of the "autobackup" recorded backups, not on the time interval between them. Exports taken manually outside of the "autobackup"
command are not counted towards the format or retention of autobackups. command are not counted towards the format or retention of autobackups.
WARNING: Running this command manually will interfere with the schedule! Do not run manually except for testing.
The actual details of the autobackup, including retention policies, full-vs-incremental, pre- and post- run The actual details of the autobackup, including retention policies, full-vs-incremental, pre- and post- run
mounting/unmounting commands, etc. are defined in the main PVC configuration file `/etc/pvc/pvc.conf`. See mounting/unmounting commands, etc. are defined in the main PVC configuration file `/etc/pvc/pvc.conf`. See
the sample configuration for more details. the sample configuration for more details.

View File

@ -132,14 +132,14 @@ def send_execution_summary_report(
backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S") backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S")
if backup.get("result", False): if backup.get("result", False):
email.append( email.append(
f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {backup.get('snapshot_name')}, type {backup.get('type', 'unknown')}"
) )
email.append( email.append(
f" Backup contains {len(backup.get('export_files'))} files totaling {ceph.format_bytes_tohuman(backup.get('export_size_bytes', 0))} ({backup.get('export_size_bytes', 0)} bytes)" f" Backup contains {len(backup.get('export_files'))} files totaling {ceph.format_bytes_tohuman(backup.get('export_size_bytes', 0))} ({backup.get('export_size_bytes', 0)} bytes)"
) )
else: else:
email.append( email.append(
f" {backup_date}: Failure in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}" f" {backup_date}: Failure in {backup.get('runtime_secs', 0)} seconds, ID {backup.get('snapshot_name')}, type {backup.get('type', 'unknown')}"
) )
email.append(f" {backup.get('result_message')}") email.append(f" {backup.get('result_message')}")
@ -150,14 +150,10 @@ def send_execution_summary_report(
log_err(f"Failed to send report email: {e}") log_err(f"Failed to send report email: {e}")
def run_vm_backup( def run_vm_backup(zkhandler, celery, config, vm_detail, force_full=False):
zkhandler, celery, current_stage, total_stages, config, vm_detail, force_full=False
):
vm_name = vm_detail["name"] vm_name = vm_detail["name"]
dom_uuid = vm_detail["uuid"] dom_uuid = vm_detail["uuid"]
backup_suffixed_path = ( backup_suffixed_path = f"{config['backup_root_path']}{config['backup_root_suffix']}"
f"{config['backup_root_path']}/{config['backup_root_suffix']}"
)
vm_backup_path = f"{backup_suffixed_path}/{vm_name}" vm_backup_path = f"{backup_suffixed_path}/{vm_name}"
autobackup_state_file = f"{vm_backup_path}/.autobackup.json" autobackup_state_file = f"{vm_backup_path}/.autobackup.json"
full_interval = config["backup_schedule"]["full_interval"] full_interval = config["backup_schedule"]["full_interval"]
@ -183,7 +179,7 @@ def run_vm_backup(
this_backup_incremental_parent = None this_backup_incremental_parent = None
this_backup_retain_snapshot = True this_backup_retain_snapshot = True
else: else:
this_backup_incremental_parent = last_full_backup["datestring"] this_backup_incremental_parent = last_full_backup["snapshot_name"]
this_backup_retain_snapshot = False this_backup_retain_snapshot = False
else: else:
# The very first ackup must be full to start the tree # The very first ackup must be full to start the tree
@ -210,9 +206,13 @@ def run_vm_backup(
backup_json_file = ( backup_json_file = (
f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/snapshot.json" f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/snapshot.json"
) )
try:
with open(backup_json_file) as fh: with open(backup_json_file) as fh:
backup_json = jload(fh) backup_json = jload(fh)
tracked_backups.insert(0, backup_json) tracked_backups.insert(0, backup_json)
except Exception as e:
log_err(celery, f"Could not open export JSON: {e}")
return list()
state_data["tracked_backups"] = tracked_backups state_data["tracked_backups"] = tracked_backups
with open(autobackup_state_file, "w") as fh: with open(autobackup_state_file, "w") as fh:
@ -255,22 +255,8 @@ def run_vm_backup(
ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name)
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
final_stage = (
current_stage
+ 5
+ len(rbd_list)
+ (len(rbd_list) if this_backup_retain_snapshot else 0)
)
for rbd in rbd_list: for rbd in rbd_list:
current_stage += 1
update(
celery,
f"[{vm_name}] Creating RBD snapshot of {rbd}",
current=current_stage,
total=total_stages,
)
pool, volume = rbd.split("/") pool, volume = rbd.split("/")
ret, msg = ceph.add_snapshot( ret, msg = ceph.add_snapshot(
zkhandler, pool, volume, snapshot_name, zk_only=False zkhandler, pool, volume, snapshot_name, zk_only=False
@ -286,24 +272,9 @@ def run_vm_backup(
if failure: if failure:
error_message = (f"[{vm_name}] Error in snapshot export, skipping",) error_message = (f"[{vm_name}] Error in snapshot export, skipping",)
current_stage = final_stage
write_backup_summary(message=error_message) write_backup_summary(message=error_message)
tracked_backups = update_tracked_backups() tracked_backups = update_tracked_backups()
update( return tracked_backups
celery,
error_message,
current=current_stage,
total=total_stages,
)
return tracked_backups, current_stage
current_stage += 1
update(
celery,
f"[{vm_name}] Creating VM configuration snapshot",
current=current_stage,
total=total_stages,
)
# Get the current domain XML # Get the current domain XML
vm_config = zkhandler.read(("domain.xml", dom_uuid)) vm_config = zkhandler.read(("domain.xml", dom_uuid))
@ -351,17 +322,10 @@ def run_vm_backup(
) )
if not ret: if not ret:
error_message = (f"[{vm_name}] Error in snapshot export, skipping",) error_message = (f"[{vm_name}] Error in snapshot export, skipping",)
current_stage = final_stage
log_err(celery, error_message) log_err(celery, error_message)
write_backup_summary(message=error_message) write_backup_summary(message=error_message)
tracked_backups = update_tracked_backups() tracked_backups = update_tracked_backups()
update( return tracked_backups
celery,
error_message,
current=current_stage,
total=total_stages,
)
return tracked_backups, current_stage
# Export the snapshot (vm.vm_worker_export_snapshot) # Export the snapshot (vm.vm_worker_export_snapshot)
export_target_path = f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/images" export_target_path = f"{backup_suffixed_path}/{vm_name}/{snapshot_name}/images"
@ -372,17 +336,8 @@ def run_vm_backup(
error_message = ( error_message = (
f"[{vm_name}] Failed to create target directory '{export_target_path}': {e}", f"[{vm_name}] Failed to create target directory '{export_target_path}': {e}",
) )
current_stage = final_stage
log_err(celery, error_message) log_err(celery, error_message)
write_backup_summary(message=error_message) return tracked_backups
tracked_backups = update_tracked_backups()
update(
celery,
f"[{vm_name}] Error in snapshot export, skipping",
current=current_stage,
total=total_stages,
)
return tracked_backups, current_stage
def export_cleanup(): def export_cleanup():
from shutil import rmtree from shutil import rmtree
@ -411,15 +366,6 @@ def run_vm_backup(
snap_volume = snapshot_volume["volume"] snap_volume = snapshot_volume["volume"]
snap_snapshot_name = snapshot_volume["snapshot"] snap_snapshot_name = snapshot_volume["snapshot"]
snap_size = snapshot_volume["stats"]["size"] snap_size = snapshot_volume["stats"]["size"]
snap_str = f"{snap_pool}/{snap_volume}@{snap_snapshot_name}"
current_stage += 1
update(
celery,
f"[{vm_name}] Exporting RBD snapshot {snap_str}",
current=current_stage,
total=total_stages,
)
if this_backup_incremental_parent is not None: if this_backup_incremental_parent is not None:
retcode, stdout, stderr = run_os_command( retcode, stdout, stderr = run_os_command(
@ -457,25 +403,10 @@ def run_vm_backup(
) )
if failure: if failure:
current_stage = final_stage
log_err(celery, error_message) log_err(celery, error_message)
write_backup_summary(message=error_message) write_backup_summary(message=error_message)
tracked_backups = update_tracked_backups() tracked_backups = update_tracked_backups()
update( return tracked_backups
celery,
error_message,
current=current_stage,
total=total_stages,
)
return tracked_backups, current_stage
current_stage += 1
update(
celery,
f"[{vm_name}] Writing snapshot details",
current=current_stage,
total=total_stages,
)
def get_dir_size(pathname): def get_dir_size(pathname):
total = 0 total = 0
@ -492,29 +423,14 @@ def run_vm_backup(
ret, e = write_backup_summary(success=True) ret, e = write_backup_summary(success=True)
if not ret: if not ret:
error_message = (f"[{vm_name}] Failed to export configuration snapshot: {e}",) error_message = (f"[{vm_name}] Failed to export configuration snapshot: {e}",)
current_stage = final_stage
log_err(celery, error_message) log_err(celery, error_message)
write_backup_summary(message=error_message) write_backup_summary(message=error_message)
tracked_backups = update_tracked_backups() tracked_backups = update_tracked_backups()
update( return tracked_backups
celery,
error_message,
current=current_stage,
total=total_stages,
)
return tracked_backups, current_stage
# Clean up the snapshot (vm.vm_worker_remove_snapshot) # Clean up the snapshot (vm.vm_worker_remove_snapshot)
if not this_backup_retain_snapshot: if not this_backup_retain_snapshot:
for snap in snap_list: for snap in snap_list:
current_stage += 1
update(
celery,
f"[{vm_name}] Removing RBD snapshot {snap}",
current=current_stage,
total=total_stages,
)
rbd, name = snap.split("@") rbd, name = snap.split("@")
pool, volume = rbd.split("/") pool, volume = rbd.split("/")
ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name) ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name)
@ -524,25 +440,10 @@ def run_vm_backup(
break break
if failure: if failure:
current_stage = final_stage
log_err(celery, error_message) log_err(celery, error_message)
write_backup_summary(message=error_message) write_backup_summary(message=error_message)
tracked_backups = update_tracked_backups() tracked_backups = update_tracked_backups()
update( return tracked_backups
celery,
error_message,
current=current_stage,
total=total_stages,
)
return tracked_backups, current_stage
current_stage += 1
update(
celery,
f"[{vm_name}] Removing VM configuration snapshot",
current=current_stage,
total=total_stages,
)
ret = zkhandler.delete( ret = zkhandler.delete(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
@ -551,14 +452,6 @@ def run_vm_backup(
error_message = (f"[{vm_name}] Failed to remove VM snapshot; continuing",) error_message = (f"[{vm_name}] Failed to remove VM snapshot; continuing",)
log_err(celery, error_message) log_err(celery, error_message)
current_stage += 1
update(
celery,
f"Finding obsolete incremental backups for '{vm_name}'",
current=current_stage,
total=total_stages,
)
marked_for_deletion = list() marked_for_deletion = list()
# Find any full backups that are expired # Find any full backups that are expired
found_full_count = 0 found_full_count = 0
@ -570,19 +463,11 @@ def run_vm_backup(
# Find any incremental backups that depend on marked parents # Find any incremental backups that depend on marked parents
for backup in tracked_backups: for backup in tracked_backups:
if backup["type"] == "incremental" and backup["incremental_parent"] in [ if backup["type"] == "incremental" and backup["incremental_parent"] in [
b["datestring"] for b in marked_for_deletion b["snapshot_name"] for b in marked_for_deletion
]: ]:
marked_for_deletion.append(backup) marked_for_deletion.append(backup)
current_stage += 1
if len(marked_for_deletion) > 0: if len(marked_for_deletion) > 0:
update(
celery,
f"Cleaning up aged out backups for '{vm_name}'",
current=current_stage,
total=total_stages,
)
for backup_to_delete in marked_for_deletion: for backup_to_delete in marked_for_deletion:
ret = vm.vm_worker_remove_snapshot( ret = vm.vm_worker_remove_snapshot(
zkhandler, None, vm_name, backup_to_delete["snapshot_name"] zkhandler, None, vm_name, backup_to_delete["snapshot_name"]
@ -594,15 +479,8 @@ def run_vm_backup(
rmtree(f"{vm_backup_path}/{backup_to_delete['snapshot_name']}") rmtree(f"{vm_backup_path}/{backup_to_delete['snapshot_name']}")
tracked_backups.remove(backup_to_delete) tracked_backups.remove(backup_to_delete)
current_stage += 1
update(
celery,
"Updating tracked backups",
current=current_stage,
total=total_stages,
)
tracked_backups = update_tracked_backups() tracked_backups = update_tracked_backups()
return tracked_backups, current_stage return tracked_backups
def worker_cluster_autobackup( def worker_cluster_autobackup(
@ -649,9 +527,7 @@ def worker_cluster_autobackup(
fail(celery, error_message) fail(celery, error_message)
return False return False
backup_suffixed_path = ( backup_suffixed_path = f"{config['backup_root_path']}{config['backup_root_suffix']}"
f"{config['backup_root_path']}/{config['backup_root_suffix']}"
)
if not path.exists(backup_suffixed_path): if not path.exists(backup_suffixed_path):
makedirs(backup_suffixed_path) makedirs(backup_suffixed_path)
@ -682,35 +558,7 @@ def worker_cluster_autobackup(
total_stages += len(config["mount_cmds"]) total_stages += len(config["mount_cmds"])
total_stages += len(config["unmount_cmds"]) total_stages += len(config["unmount_cmds"])
for vm_detail in backup_vms: total_stages += len(backup_vms)
total_disks = len([d for d in vm_detail["disks"] if d["type"] == "rbd"])
total_stages += 2 + (2 * total_disks)
vm_name = vm_detail["name"]
vm_backup_path = f"{backup_suffixed_path}/{vm_name}"
autobackup_state_file = f"{vm_backup_path}/.autobackup.json"
if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file):
# There are no existing backups so the list is empty
state_data = dict()
tracked_backups = list()
else:
with open(autobackup_state_file) as fh:
state_data = jload(fh)
tracked_backups = state_data["tracked_backups"]
full_backups = [b for b in tracked_backups if b["type"] == "full"]
if len(full_backups) > 0:
last_full_backup = full_backups[0]
last_full_backup_idx = tracked_backups.index(last_full_backup)
if force_full or last_full_backup_idx >= full_interval - 1:
this_backup_retain_snapshot = True
else:
this_backup_retain_snapshot = False
else:
# The very first ackup must be full to start the tree
this_backup_retain_snapshot = True
if this_backup_retain_snapshot:
total_stages += total_disks
log_info( log_info(
celery, celery,
@ -749,11 +597,46 @@ def worker_cluster_autobackup(
# Execute the backup: take a snapshot, then export the snapshot # Execute the backup: take a snapshot, then export the snapshot
for vm_detail in backup_vms: for vm_detail in backup_vms:
summary, current_stage = run_vm_backup( vm_backup_path = f"{backup_suffixed_path}/{vm_detail['name']}"
autobackup_state_file = f"{vm_backup_path}/.autobackup.json"
if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file):
# There are no existing backups so the list is empty
state_data = dict()
tracked_backups = list()
else:
with open(autobackup_state_file) as fh:
state_data = jload(fh)
tracked_backups = state_data["tracked_backups"]
full_backups = [b for b in tracked_backups if b["type"] == "full"]
if len(full_backups) > 0:
last_full_backup = full_backups[0]
last_full_backup_idx = tracked_backups.index(last_full_backup)
if force_full:
this_backup_incremental_parent = None
elif last_full_backup_idx >= full_interval - 1:
this_backup_incremental_parent = None
else:
this_backup_incremental_parent = last_full_backup["snapshot_name"]
else:
# The very first ackup must be full to start the tree
this_backup_incremental_parent = None
export_type = (
"incremental" if this_backup_incremental_parent is not None else "full"
)
current_stage += 1
update(
celery,
f"Performing autobackup of VM {vm_detail['name']} ({export_type})",
current=current_stage,
total=total_stages,
)
summary = run_vm_backup(
zkhandler, zkhandler,
celery, celery,
current_stage,
total_stages,
config, config,
vm_detail, vm_detail,
force_full=force_full, force_full=force_full,