diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 823e9292..b74d5f9a 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -3194,7 +3194,23 @@ class API_VM_Snapshot(Resource): id: Message """ snapshot_name = reqargs.get("snapshot_name", None) - return api_helper.create_vm_snapshot(vm, snapshot_name=snapshot_name) + + task = run_celery_task( + "vm.create_snapshot", + domain=vm, + snapshot_name=snapshot_name, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.create_snapshot", + "run_on": get_primary_node(), + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) @RequestParser( [ @@ -3236,7 +3252,23 @@ class API_VM_Snapshot(Resource): id: Message """ snapshot_name = reqargs.get("snapshot_name", None) - return api_helper.remove_vm_snapshot(vm, snapshot_name) + + task = run_celery_task( + "vm.remove_snapshot", + domain=vm, + snapshot_name=snapshot_name, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.remove_snapshot", + "run_on": get_primary_node(), + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) api.add_resource(API_VM_Snapshot, "/vm//snapshot") @@ -3284,7 +3316,23 @@ class API_VM_Snapshot_Rollback(Resource): id: Message """ snapshot_name = reqargs.get("snapshot_name", None) - return api_helper.rollback_vm_snapshot(vm, snapshot_name) + + task = run_celery_task( + "vm.rollback_snapshot", + domain=vm, + snapshot_name=snapshot_name, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.rollback_snapshot", + "run_on": get_primary_node(), + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) api.add_resource(API_VM_Snapshot_Rollback, "/vm//snapshot/rollback") @@ -3354,8 +3402,24 @@ class API_VM_Snapshot_Export(Resource): snapshot_name = reqargs.get("snapshot_name", None) export_path = reqargs.get("export_path", None) incremental_parent = reqargs.get("incremental_parent", None) - return api_helper.export_vm_snapshot( - vm, snapshot_name, export_path, incremental_parent + + task = run_celery_task( + "vm.export_snapshot", + domain=vm, + snapshot_name=snapshot_name, + export_path=export_path, + incremental_parent=incremental_parent, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.export_snapshot", + "run_on": get_primary_node(), + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) @@ -3427,8 +3491,24 @@ class API_VM_Snapshot_Import(Resource): snapshot_name = reqargs.get("snapshot_name", None) import_path = reqargs.get("import_path", None) retain_snapshot = bool(strtobool(reqargs.get("retain_snapshot", "True"))) - return api_helper.import_vm_snapshot( - vm, snapshot_name, import_path, retain_snapshot + + task = run_celery_task( + "vm.import_snapshot", + domain=vm, + snapshot_name=snapshot_name, + import_path=import_path, + retain_snapshot=retain_snapshot, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.import_snapshot", + "run_on": get_primary_node(), + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index 125c1d91..f36435b3 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -1758,7 +1758,7 @@ def cli_vm_flush_locks(domain, wait_flag): NOTE: This is a task-based command. The "--wait" flag (default) will block and show progress. Specifying the "--no-wait" flag will return immediately with a job ID instead, which can be queried externally later. """ - retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag) + retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag=wait_flag) if retcode and wait_flag: retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) @@ -1787,7 +1787,15 @@ def cli_vm_snapshot(): @connection_req @click.argument("domain") @click.argument("snapshot_name", required=False, default=None) -def cli_vm_snapshot_create(domain, snapshot_name): +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) +def cli_vm_snapshot_create(domain, snapshot_name, wait_flag): """ Create a snapshot of the disks and XML configuration of virtual machine DOMAIN, with the optional name SNAPSHOT_NAME. DOMAIN may be a UUID or name. @@ -1797,18 +1805,12 @@ def cli_vm_snapshot_create(domain, snapshot_name): VM at the moment of the snapshot. """ - echo( - CLI_CONFIG, - f"Taking snapshot of VM '{domain}'... ", - newline=False, - ) retcode, retmsg = pvc.lib.vm.vm_create_snapshot( - CLI_CONFIG, domain, snapshot_name=snapshot_name + CLI_CONFIG, domain, snapshot_name=snapshot_name, wait_flag=wait_flag ) - if retcode: - echo(CLI_CONFIG, "done.") - else: - echo(CLI_CONFIG, "failed.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -1819,23 +1821,27 @@ def cli_vm_snapshot_create(domain, snapshot_name): @connection_req @click.argument("domain") @click.argument("snapshot_name") +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt("Remove shapshot {snapshot_name} of VM {domain}") -def cli_vm_snapshot_remove(domain, snapshot_name): +def cli_vm_snapshot_remove(domain, snapshot_name, wait_flag): """ Remove the snapshot SNAPSHOT_NAME of the disks and XML configuration of virtual machine DOMAIN, DOMAIN may be a UUID or name. """ - echo( - CLI_CONFIG, - f"Removing snapshot '{snapshot_name}' of VM '{domain}'... ", - newline=False, + retcode, retmsg = pvc.lib.vm.vm_remove_snapshot( + CLI_CONFIG, domain, snapshot_name, wait_flag=wait_flag ) - retcode, retmsg = pvc.lib.vm.vm_remove_snapshot(CLI_CONFIG, domain, snapshot_name) - if retcode: - echo(CLI_CONFIG, "done.") - else: - echo(CLI_CONFIG, "failed.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -1848,25 +1854,29 @@ def cli_vm_snapshot_remove(domain, snapshot_name): @connection_req @click.argument("domain") @click.argument("snapshot_name") +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt( "Roll back to snapshot {snapshot_name} of {domain} and lose all data and changes since this snapshot" ) -def cli_vm_snapshot_rollback(domain, snapshot_name): +def cli_vm_snapshot_rollback(domain, snapshot_name, wait_flag): """ Roll back to the snapshot SNAPSHOT_NAME of the disks and XML configuration of virtual machine DOMAIN, DOMAIN may be a UUID or name. """ - echo( - CLI_CONFIG, - f"Rolling back to snapshot '{snapshot_name}' of VM '{domain}'... ", - newline=False, + retcode, retmsg = pvc.lib.vm.vm_rollback_snapshot( + CLI_CONFIG, domain, snapshot_name, wait_flag=wait_flag ) - retcode, retmsg = pvc.lib.vm.vm_rollback_snapshot(CLI_CONFIG, domain, snapshot_name) - if retcode: - echo(CLI_CONFIG, "done.") - else: - echo(CLI_CONFIG, "failed.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -1887,7 +1897,17 @@ def cli_vm_snapshot_rollback(domain, snapshot_name): default=None, help="Perform an incremental volume export from this parent snapshot.", ) -def cli_vm_snapshot_export(domain, snapshot_name, export_path, incremental_parent): +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) +def cli_vm_snapshot_export( + domain, snapshot_name, export_path, incremental_parent, wait_flag +): """ Export the (existing) snapshot SNAPSHOT_NAME of virtual machine DOMAIN to the absolute path EXPORT_PATH on the current PVC primary coordinator. DOMAIN may be a UUID or name. @@ -1901,19 +1921,17 @@ def cli_vm_snapshot_export(domain, snapshot_name, export_path, incremental_paren Full export volume images are sparse-allocated, however it is recommended for safety to consider their maximum allocated size when allocated space for the EXPORT_PATH. Incremental volume images are generally small but are dependent entirely on the rate of data change in each volume. """ - _, primary_node = pvc.lib.cluster.get_primary_node(CLI_CONFIG) - echo( - CLI_CONFIG, - f'Exporting snapshot "{snapshot_name}" of VM "{domain}" to "{primary_node}:{export_path}"... ', - newline=False, - ) retcode, retmsg = pvc.lib.vm.vm_export_snapshot( - CLI_CONFIG, domain, snapshot_name, export_path, incremental_parent + CLI_CONFIG, + domain, + snapshot_name, + export_path, + incremental_parent=incremental_parent, + wait_flag=wait_flag, ) - if retcode: - echo(CLI_CONFIG, "done.") - else: - echo(CLI_CONFIG, "failed.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -1933,7 +1951,17 @@ def cli_vm_snapshot_export(domain, snapshot_name, export_path, incremental_paren default=True, help="Retain or remove restored (parent, if incremental) snapshot in Ceph.", ) -def cli_vm_snapshot_import(domain, snapshot_name, import_path, retain_snapshot): +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) +def cli_vm_snapshot_import( + domain, snapshot_name, import_path, retain_snapshot, wait_flag +): """ Import the snapshot SNAPSHOT_NAME of virtual machine DOMAIN from the absolute path IMPORT_PATH on the current PVC primary coordinator. DOMAIN may be a UUID or name. @@ -1949,18 +1977,17 @@ def cli_vm_snapshot_import(domain, snapshot_name, import_path, retain_snapshot): WARNING: The "-R"/"--remove-snapshot" option will invalidate any existing incremental snapshots based on the same incremental parent for the imported VM. """ - echo( - CLI_CONFIG, - f"Importing snapshot '{snapshot_name}' of VM '{domain}'... ", - newline=False, - ) retcode, retmsg = pvc.lib.vm.vm_import_snapshot( - CLI_CONFIG, domain, snapshot_name, import_path, retain_snapshot + CLI_CONFIG, + domain, + snapshot_name, + import_path, + retain_snapshot=retain_snapshot, + wait_flag=wait_flag, ) - if retcode: - echo(CLI_CONFIG, "done.") - else: - echo(CLI_CONFIG, "failed.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index 2d2b5225..c0602fb3 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -421,7 +421,7 @@ def vm_node(config, vm, target_node, action, force=False, wait=False, force_live return retstatus, response.json().get("message", "") -def vm_locks(config, vm, wait_flag): +def vm_locks(config, vm, wait_flag=True): """ Flush RBD locks of (stopped) VM @@ -498,7 +498,7 @@ def vm_restore(config, vm, backup_path, backup_datestring, retain_snapshot=False return True, response.json().get("message", "") -def vm_create_snapshot(config, vm, snapshot_name=None): +def vm_create_snapshot(config, vm, snapshot_name=None, wait_flag=True): """ Take a snapshot of a VM's disks and configuration @@ -513,13 +513,10 @@ def vm_create_snapshot(config, vm, snapshot_name=None): config, "post", "/vm/{vm}/snapshot".format(vm=vm), params=params ) - if response.status_code != 200: - return False, response.json().get("message", "") - else: - return True, response.json().get("message", "") + return get_wait_retdata(response, wait_flag) -def vm_remove_snapshot(config, vm, snapshot_name): +def vm_remove_snapshot(config, vm, snapshot_name, wait_flag=True): """ Remove a snapshot of a VM's disks and configuration @@ -532,13 +529,10 @@ def vm_remove_snapshot(config, vm, snapshot_name): config, "delete", "/vm/{vm}/snapshot".format(vm=vm), params=params ) - if response.status_code != 200: - return False, response.json().get("message", "") - else: - return True, response.json().get("message", "") + return get_wait_retdata(response, wait_flag) -def vm_rollback_snapshot(config, vm, snapshot_name): +def vm_rollback_snapshot(config, vm, snapshot_name, wait_flag=True): """ Roll back to a snapshot of a VM's disks and configuration @@ -551,13 +545,12 @@ def vm_rollback_snapshot(config, vm, snapshot_name): config, "post", "/vm/{vm}/snapshot/rollback".format(vm=vm), params=params ) - if response.status_code != 200: - return False, response.json().get("message", "") - else: - return True, response.json().get("message", "") + return get_wait_retdata(response, wait_flag) -def vm_export_snapshot(config, vm, snapshot_name, export_path, incremental_parent): +def vm_export_snapshot( + config, vm, snapshot_name, export_path, incremental_parent=None, wait_flag=True +): """ Export an (existing) snapshot of a VM's disks and configuration to export_path, optionally incremental with incremental_parent @@ -577,13 +570,12 @@ def vm_export_snapshot(config, vm, snapshot_name, export_path, incremental_paren config, "post", "/vm/{vm}/snapshot/export".format(vm=vm), params=params ) - if response.status_code != 200: - return False, response.json().get("message", "") - else: - return True, response.json().get("message", "") + return get_wait_retdata(response, wait_flag) -def vm_import_snapshot(config, vm, snapshot_name, import_path, retain_snapshot=False): +def vm_import_snapshot( + config, vm, snapshot_name, import_path, retain_snapshot=False, wait_flag=True +): """ Import a snapshot of {vm} and its volumes from a local primary coordinator filesystem path @@ -600,10 +592,7 @@ def vm_import_snapshot(config, vm, snapshot_name, import_path, retain_snapshot=F config, "post", "/vm/{vm}/snapshot/import".format(vm=vm), params=params ) - if response.status_code != 200: - return False, response.json().get("message", "") - else: - return True, response.json().get("message", "") + return get_wait_retdata(response, wait_flag) def vm_vcpus_set(config, vm, vcpus, topology, restart): diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 0852ebf4..fbf24546 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -1232,758 +1232,6 @@ def get_list( return True, sorted(vm_data_list, key=lambda d: d["name"]) -# -# VM Snapshot Tasks -# -def create_vm_snapshot(zkhandler, domain, snapshot_name=None, zk_only=False): - # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if not dom_uuid: - return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - - if snapshot_name is None: - now = datetime.now() - snapshot_name = now.strftime("%Y%m%d%H%M%S") - else: - reg = re.compile("^[a-z0-9.-_]+$") - if not reg.match(snapshot_name): - return ( - False, - f'ERROR: Snapshot name "{snapshot_name}" contains invalid characters; only alphanumeric, ".", "-", and "_" characters are allowed!', - ) - current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid)) - if current_snapshots and snapshot_name in current_snapshots: - return ( - False, - f'ERROR: Snapshot name "{snapshot_name}" already exists for VM "{domain}"!', - ) - - tstart = time.time() - - # Get the list of all RBD volumes - rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") - - snap_list = list() - - # If a snapshot fails, clean up any snapshots that were successfuly created - def cleanup_failure(): - for snapshot in snap_list: - rbd, snapshot_name = snapshot.split("@") - pool, volume = rbd.split("/") - # We capture no output here, because if this fails too we're in a deep - # error chain and will just ignore it - ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) - - # Iterrate through and create a snapshot for each RBD volume - for rbd in rbd_list: - pool, volume = rbd.split("/") - ret, msg = ceph.add_snapshot( - zkhandler, pool, volume, snapshot_name, zk_only=zk_only - ) - if not ret: - cleanup_failure() - return False, msg - else: - snap_list.append(f"{pool}/{volume}@{snapshot_name}") - - # Get the current domain XML - vm_config = zkhandler.read(("domain.xml", dom_uuid)) - - # Add the snapshot entry to Zookeeper - zkhandler.write( - [ - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.name", - snapshot_name, - ), - snapshot_name, - ), - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.timestamp", - snapshot_name, - ), - tstart, - ), - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.xml", - snapshot_name, - ), - vm_config, - ), - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.rbd_snapshots", - snapshot_name, - ), - ",".join(snap_list), - ), - ] - ) - - tend = time.time() - ttot = round(tend - tstart, 2) - return ( - True, - f'Successfully created snapshot "{snapshot_name}" of VM "{domain}" in {ttot}s.', - ) - - -def remove_vm_snapshot(zkhandler, domain, snapshot_name): - # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if not dom_uuid: - return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - - if not zkhandler.exists( - ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) - ): - return ( - False, - f'ERROR: Could not find snapshot "{snapshot_name}" of VM "{domain}"!', - ) - - tstart = time.time() - - _snapshots = zkhandler.read( - ("domain.snapshots", dom_uuid, "domain_snapshot.rbd_snapshots", snapshot_name) - ) - rbd_snapshots = _snapshots.split(",") - for snap in rbd_snapshots: - rbd, name = snap.split("@") - pool, volume = rbd.split("/") - ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name) - if not ret: - return False, msg - - ret = zkhandler.delete( - ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) - ) - if not ret: - return ( - False, - f'ERROR: Failed to delete snapshot "{snapshot_name}" of VM "{domain}" in Zookeeper.', - ) - - tend = time.time() - ttot = round(tend - tstart, 2) - return ( - True, - f'Successfully removed snapshot "{snapshot_name}" of VM "{domain}" in {ttot}s.', - ) - - -def rollback_vm_snapshot(zkhandler, domain, snapshot_name): - # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if not dom_uuid: - return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - - # Verify that the VM is in a stopped state; renaming is not supported otherwise - state = zkhandler.read(("domain.state", dom_uuid)) - if state not in ["stop", "disable"]: - return ( - False, - 'ERROR: VM "{}" is not in stopped state; VMs cannot be rolled back while running.'.format( - domain - ), - ) - - # Verify that the snapshot exists - if not zkhandler.exists( - ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) - ): - return ( - False, - f'ERROR: Could not find snapshot "{snapshot_name}" of VM "{domain}"!', - ) - - tstart = time.time() - - _snapshots = zkhandler.read( - ("domain.snapshots", dom_uuid, "domain_snapshot.rbd_snapshots", snapshot_name) - ) - rbd_snapshots = _snapshots.split(",") - for snap in rbd_snapshots: - rbd, name = snap.split("@") - pool, volume = rbd.split("/") - ret, msg = ceph.rollback_snapshot(zkhandler, pool, volume, name) - if not ret: - return False, msg - - # Get the snapshot domain XML - vm_config = zkhandler.read( - ("domain.snapshots", dom_uuid, "domain_snapshot.xml", snapshot_name) - ) - - # Write the restored config to the main XML config - zkhandler.write( - [ - ( - ( - "domain.xml", - dom_uuid, - ), - vm_config, - ), - ] - ) - - tend = time.time() - ttot = round(tend - tstart, 2) - return ( - True, - f'Successfully rolled back to snapshot "{snapshot_name}" of VM "{domain}" in {ttot}s.', - ) - - -def export_vm_snapshot( - zkhandler, domain, snapshot_name, export_path, incremental_parent=None -): - # 0b. Validations part 1 - # Validate that the target path is valid - if not re.match(r"^/", export_path): - return ( - False, - f"ERROR: Target path {export_path} is not a valid absolute path on the primary coordinator!", - ) - - # Ensure that backup_path (on this node) exists - if not os.path.isdir(export_path): - return ( - False, - f"ERROR: Target path {export_path} does not exist!", - ) - - # 1a. Create destination directory - export_target_root = f"{export_path}/{domain}" - export_target_path = f"{export_path}/{domain}/{snapshot_name}/images" - if not os.path.isdir(export_target_path): - try: - os.makedirs(export_target_path) - except Exception as e: - return ( - False, - f"ERROR: Failed to create target directory {export_target_path}: {e}", - ) - - tstart = time.time() - export_type = "incremental" if incremental_parent is not None else "full" - - # 1b. Prepare export JSON writer (it will write on any result) - def write_export_json( - result=False, - result_message="", - vm_detail=None, - export_files=None, - export_files_size=0, - ttot=None, - ): - if ttot is None: - tend = time.time() - ttot = round(tend - tstart, 2) - - export_details = { - "type": export_type, - "snapshot_name": snapshot_name, - "incremental_parent": incremental_parent, - "result": result, - "result_message": result_message, - "runtime_secs": ttot, - "vm_detail": vm_detail, - "export_files": export_files, - "export_size_bytes": export_files_size, - } - with open(f"{export_target_root}/{snapshot_name}/snapshot.json", "w") as fh: - jdump(export_details, fh) - - # 2. Validations part 2 - # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if not dom_uuid: - error_message = f'Could not find VM "{domain}" in the cluster!' - write_export_json(result=False, result_message=f"ERROR: {error_message}") - return False, f"ERROR: {error_message}" - - # 3. Get information about VM - vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] - if not isinstance(vm_detail, dict): - error_message = f"VM listing returned invalid data: {vm_detail}" - write_export_json(result=False, result_message=f"ERROR: {error_message}") - return False, f"ERROR: {error_message}" - - # 4. Validate that the given snapshot exists (and incremental parent exists if applicable) - if not zkhandler.exists( - ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) - ): - error_message = ( - f'ERROR: Could not find snapshot "{snapshot_name}" of VM "{domain}"!', - ) - write_export_json(result=False, result_message=f"ERROR: {error_message}") - return False, f"ERROR: {error_message}" - - if incremental_parent is not None and not zkhandler.exists( - ("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent) - ): - error_message = ( - f'ERROR: Could not find snapshot "{snapshot_name}" of VM "{domain}"!', - ) - write_export_json(result=False, result_message=f"ERROR: {error_message}") - return False, f"ERROR: {error_message}" - - # Get details about VM snapshot - _, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many( - [ - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.name", - snapshot_name, - ) - ), - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.timestamp", - snapshot_name, - ) - ), - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.xml", - snapshot_name, - ) - ), - ( - ( - "domain.snapshots", - dom_uuid, - "domain_snapshot.rbd_snapshots", - snapshot_name, - ) - ), - ] - ) - - # Override the current XML with the snapshot XML; but all other metainfo is current - vm_detail["xml"] = snapshot_xml - - # Get the list of volumes - snapshot_volumes = list() - for rbdsnap in snapshot_rbdsnaps.split(","): - pool, _volume = rbdsnap.split("/") - volume, name = _volume.split("@") - ret, snapshots = ceph.get_list_snapshot( - zkhandler, pool, volume, limit=name, is_fuzzy=False - ) - if ret: - snapshot_volumes += snapshots - - # Set the export filetype - if incremental_parent is not None: - export_fileext = "rbddiff" - else: - export_fileext = "rbdimg" - - # 6. Dump snapshot to folder with `rbd export` (full) or `rbd export-diff` (incremental) - is_snapshot_export_failed = False - which_snapshot_export_failed = list() - export_files = list() - for snapshot_volume in snapshot_volumes: - pool = snapshot_volume["pool"] - volume = snapshot_volume["volume"] - snapshot_name = snapshot_volume["snapshot"] - size = snapshot_volume["stats"]["size"] - - if incremental_parent is not None: - retcode, stdout, stderr = common.run_os_command( - f"rbd export-diff --from-snap {incremental_parent} {pool}/{volume}@{snapshot_name} {export_target_path}/{pool}.{volume}.{export_fileext}" - ) - if retcode: - is_snapshot_export_failed = True - which_snapshot_export_failed.append(f"{pool}/{volume}") - else: - export_files.append((f"images/{pool}.{volume}.{export_fileext}", size)) - else: - retcode, stdout, stderr = common.run_os_command( - f"rbd export --export-format 2 {pool}/{volume}@{snapshot_name} {export_target_path}/{pool}.{volume}.{export_fileext}" - ) - if retcode: - is_snapshot_export_failed = True - which_snapshot_export_failed.append(f"{pool}/{volume}") - else: - export_files.append((f"images/{pool}.{volume}.{export_fileext}", size)) - - def get_dir_size(path): - total = 0 - with scandir(path) as it: - for entry in it: - if entry.is_file(): - total += entry.stat().st_size - elif entry.is_dir(): - total += get_dir_size(entry.path) - return total - - export_files_size = get_dir_size(export_target_path) - - if is_snapshot_export_failed: - error_message = f'Failed to export snapshot for volume(s) {", ".join(which_snapshot_export_failed)}' - write_export_json( - result=False, - result_message=f"ERROR: {error_message}", - ) - return ( - False, - f"ERROR: {error_message}", - ) - - tend = time.time() - ttot = round(tend - tstart, 2) - - retlines = list() - - myhostname = gethostname().split(".")[0] - result_message = f"Successfully exported VM '{domain}' snapshot '{snapshot_name}' ({export_type}) to '{myhostname}:{export_path}' in {ttot}s." - retlines.append(result_message) - - write_export_json( - result=True, - result_message=result_message, - vm_detail=vm_detail, - export_files=export_files, - export_files_size=export_files_size, - ttot=ttot, - ) - - return True, "\n".join(retlines) - - -def import_vm_snapshot( - zkhandler, domain, snapshot_name, import_path, retain_snapshot=False -): - tstart = time.time() - myhostname = gethostname().split(".")[0] - - # 0. Validations - # Validate that VM does not exist in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if dom_uuid: - return ( - False, - f'ERROR: VM "{domain}" already exists in the cluster! Remove or rename it before importing a snapshot.', - ) - - # Validate that the source path is valid - if not re.match(r"^/", import_path): - return ( - False, - f"ERROR: Source path {import_path} is not a valid absolute path on the primary coordinator!", - ) - - # Ensure that import_path (on this node) exists - if not os.path.isdir(import_path): - return False, f"ERROR: Source path {import_path} does not exist!" - - # Ensure that domain path (on this node) exists - vm_import_path = f"{import_path}/{domain}" - if not os.path.isdir(vm_import_path): - return False, f"ERROR: Source VM path {vm_import_path} does not exist!" - - # Ensure that the archives are present - export_source_snapshot_file = f"{vm_import_path}/{snapshot_name}/snapshot.json" - if not os.path.isfile(export_source_snapshot_file): - return False, "ERROR: The specified source export files do not exist!" - - # 1. Read the export file and get VM details - try: - with open(export_source_snapshot_file) as fh: - export_source_details = jload(fh) - except Exception as e: - return False, f"ERROR: Failed to read source export details: {e}" - - # Handle incrementals - incremental_parent = export_source_details.get("incremental_parent", None) - if incremental_parent is not None: - export_source_parent_snapshot_file = ( - f"{vm_import_path}/{incremental_parent}/snapshot.json" - ) - if not os.path.isfile(export_source_parent_snapshot_file): - return ( - False, - "ERROR: This export is incremental but the required incremental parent files do not exist at '{myhostname}:{vm_import_path}/{incremental_parent}'!", - ) - - try: - with open(export_source_parent_snapshot_file) as fh: - export_source_parent_details = jload(fh) - except Exception as e: - return ( - False, - f"ERROR: Failed to read source incremental parent export details: {e}", - ) - - # 4. Import volumes - is_snapshot_remove_failed = False - which_snapshot_remove_failed = list() - if incremental_parent is not None: - for volume_file, volume_size in export_source_details.get("export_files"): - volume_size = f"{volume_size}B" - pool, volume, _ = volume_file.split("/")[-1].split(".") - try: - parent_volume_file = [ - f[0] - for f in export_source_parent_details.get("export_files") - if f[0].split("/")[-1].replace(".rbdimg", "") - == volume_file.split("/")[-1].replace(".rbddiff", "") - ][0] - except Exception as e: - return ( - False, - f"ERROR: Failed to find parent volume for volume {pool}/{volume}; export may be corrupt or invalid: {e}", - ) - - # First we create the expected volumes then clean them up - # This process is a bit of a hack because rbd import does not expect an existing volume, - # but we need the information in PVC. - # Thus create the RBD volume using ceph.add_volume based on the export size, and then - # manually remove the RBD volume (leaving the PVC metainfo) - retcode, retmsg = ceph.add_volume(zkhandler, pool, volume, volume_size) - if not retcode: - return False, f"ERROR: Failed to create imported volume: {retmsg}" - - retcode, stdout, stderr = common.run_os_command( - f"rbd remove {pool}/{volume}" - ) - if retcode: - return ( - False, - f"ERROR: Failed to remove temporary RBD volume '{pool}/{volume}': {stderr}", - ) - - # Next we import the parent image - retcode, stdout, stderr = common.run_os_command( - f"rbd import --export-format 2 --dest-pool {pool} {import_path}/{domain}/{incremental_parent}/{parent_volume_file} {volume}" - ) - if retcode: - return ( - False, - f"ERROR: Failed to import parent export image {parent_volume_file}: {stderr}", - ) - - # Import VM config and metadata in import state, from the *source* details - try: - retcode, retmsg = define_vm( - zkhandler, - export_source_parent_details["vm_detail"]["xml"], - export_source_parent_details["vm_detail"]["node"], - export_source_parent_details["vm_detail"]["node_limit"], - export_source_parent_details["vm_detail"]["node_selector"], - export_source_parent_details["vm_detail"]["node_autostart"], - export_source_parent_details["vm_detail"]["migration_method"], - export_source_parent_details["vm_detail"]["migration_max_downtime"], - export_source_parent_details["vm_detail"]["profile"], - export_source_parent_details["vm_detail"]["tags"], - "import", - ) - if not retcode: - return False, f"ERROR: Failed to define imported VM: {retmsg}" - except Exception as e: - return False, f"ERROR: Failed to parse VM export details: {e}" - - # Handle the VM snapshots - if retain_snapshot: - # Create the parent snapshot - retcode, retmsg = create_vm_snapshot( - zkhandler, domain, snapshot_name=incremental_parent, zk_only=True - ) - if not retcode: - return ( - False, - f"ERROR: Failed to create imported snapshot for {incremental_parent} (parent): {retmsg}", - ) - - for volume_file, volume_size in export_source_details.get("export_files"): - volume_size = f"{volume_size}B" - pool, volume, _ = volume_file.split("/")[-1].split(".") - # Then we import the incremental diffs - retcode, stdout, stderr = common.run_os_command( - f"rbd import-diff {import_path}/{domain}/{snapshot_name}/{volume_file} {pool}/{volume}" - ) - if retcode: - return ( - False, - f"ERROR: Failed to import incremental export image {volume_file}: {stderr}", - ) - - if not retain_snapshot: - retcode, stdout, stderr = common.run_os_command( - f"rbd snap rm {pool}/{volume}@{incremental_parent}" - ) - if retcode: - is_snapshot_remove_failed = True - which_snapshot_remove_failed.append(f"{pool}/{volume}") - - retcode, stdout, stderr = common.run_os_command( - f"rbd snap rm {pool}/{volume}@{snapshot_name}" - ) - if retcode: - is_snapshot_remove_failed = True - which_snapshot_remove_failed.append(f"{pool}/{volume}") - - # Now update VM config and metadata, from the *current* details - try: - retcode, retmsg = modify_vm( - zkhandler, - domain, - False, - export_source_details["vm_detail"]["xml"], - ) - if not retcode: - return False, f"ERROR: Failed to modify imported VM: {retmsg}" - - retcode, retmsg = move_vm( - zkhandler, - domain, - export_source_details["vm_detail"]["node"], - ) - if not retcode: - # We don't actually care if this fails, because it just means the vm was never moved - pass - - retcode, retmsg = modify_vm_metadata( - zkhandler, - domain, - export_source_details["vm_detail"]["node_limit"], - export_source_details["vm_detail"]["node_selector"], - export_source_details["vm_detail"]["node_autostart"], - export_source_details["vm_detail"]["profile"], - export_source_details["vm_detail"]["migration_method"], - export_source_details["vm_detail"]["migration_max_downtime"], - ) - if not retcode: - return False, f"ERROR: Failed to modify imported VM: {retmsg}" - except Exception as e: - return False, f"ERROR: Failed to parse VM export details: {e}" - - if retain_snapshot: - # Create the child snapshot - retcode, retmsg = create_vm_snapshot( - zkhandler, domain, snapshot_name=snapshot_name, zk_only=True - ) - if not retcode: - return ( - False, - f"ERROR: Failed to create imported snapshot for {snapshot_name}: {retmsg}", - ) - else: - for volume_file, volume_size in export_source_details.get("export_files"): - volume_size = f"{volume_size}B" - pool, volume, _ = volume_file.split("/")[-1].split(".") - - # First we create the expected volumes then clean them up - # This process is a bit of a hack because rbd import does not expect an existing volume, - # but we need the information in PVC. - # Thus create the RBD volume using ceph.add_volume based on the export size, and then - # manually remove the RBD volume (leaving the PVC metainfo) - retcode, retmsg = ceph.add_volume(zkhandler, pool, volume, volume_size) - if not retcode: - return False, f"ERROR: Failed to create imported volume: {retmsg}" - - retcode, stdout, stderr = common.run_os_command( - f"rbd remove {pool}/{volume}" - ) - if retcode: - return ( - False, - f"ERROR: Failed to remove temporary RBD volume '{pool}/{volume}': {stderr}", - ) - - # Then we perform the actual import - retcode, stdout, stderr = common.run_os_command( - f"rbd import --export-format 2 --dest-pool {pool} {import_path}/{domain}/{snapshot_name}/{volume_file} {volume}" - ) - if retcode: - return ( - False, - f"ERROR: Failed to import export image {volume_file}: {stderr}", - ) - - if not retain_snapshot: - retcode, stdout, stderr = common.run_os_command( - f"rbd snap rm {pool}/{volume}@{snapshot_name}" - ) - if retcode: - return ( - False, - f"ERROR: Failed to remove imported image snapshot for {volume_file}: {stderr}", - ) - - # 2. Import VM config and metadata in provision state - try: - retcode, retmsg = define_vm( - zkhandler, - export_source_details["vm_detail"]["xml"], - export_source_details["vm_detail"]["node"], - export_source_details["vm_detail"]["node_limit"], - export_source_details["vm_detail"]["node_selector"], - export_source_details["vm_detail"]["node_autostart"], - export_source_details["vm_detail"]["migration_method"], - export_source_details["vm_detail"]["migration_max_downtime"], - export_source_details["vm_detail"]["profile"], - export_source_details["vm_detail"]["tags"], - "import", - ) - if not retcode: - return False, f"ERROR: Failed to define imported VM: {retmsg}" - except Exception as e: - return False, f"ERROR: Failed to parse VM export details: {e}" - - # Finally we handle the VM snapshot - if retain_snapshot: - retcode, retmsg = create_vm_snapshot( - zkhandler, domain, snapshot_name=snapshot_name, zk_only=True - ) - if not retcode: - return ( - False, - f"ERROR: Failed to create imported snapshot for {snapshot_name}: {retmsg}", - ) - - # 5. Start VM - retcode, retmsg = start_vm(zkhandler, domain) - if not retcode: - return False, f"ERROR: Failed to start imported VM {domain}: {retmsg}" - - tend = time.time() - ttot = round(tend - tstart, 2) - retlines = list() - - if is_snapshot_remove_failed: - retlines.append( - f"WARNING: Failed to remove hanging snapshot(s) as requested for volume(s) {', '.join(which_snapshot_remove_failed)}" - ) - - retlines.append( - f"Successfully imported VM '{domain}' at snapshot '{snapshot_name}' from '{myhostname}:{import_path}' in {ttot}s." - ) - - return True, "\n".join(retlines) - - # # VM Backup Tasks # @@ -2644,7 +1892,6 @@ def vm_worker_helper_getdom(tuuid): def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): current_stage = 0 total_stages = 3 - start( celery, f"Flushing RBD locks for VM {domain} [forced={force_unlock}]", @@ -2743,7 +1990,6 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): def vm_worker_attach_device(zkhandler, celery, domain, xml_spec): current_stage = 0 total_stages = 1 - start( celery, f"Hot-attaching XML device to VM {domain}", @@ -2787,12 +2033,11 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec): def vm_worker_detach_device(zkhandler, celery, domain, xml_spec): current_stage = 0 total_stages = 1 - start( celery, f"Hot-detaching XML device from VM {domain}", current=current_stage, - total_stages=total_stages, + total=total_stages, ) dom_uuid = getDomainUUID(zkhandler, domain) @@ -2824,5 +2069,1019 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec): celery, f"Successfully hot-detached XML device from VM {domain}", current=current_stage, - total_stages=total_stages, + total=total_stages, + ) + + +def vm_worker_create_snapshot( + zkhandler, celery, domain, snapshot_name=None, zk_only=False +): + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return + + if snapshot_name is None: + now = datetime.now() + snapshot_name = now.strftime("%Y%m%d%H%M%S") + else: + reg = re.compile("^[a-z0-9.-_]+$") + if not reg.match(snapshot_name): + fail( + celery, + "Snapshot name '{snapshot_name}' contains invalid characters; only alphanumeric, '.', '-', and '_' characters are allowed", + ) + return + + current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid)) + if current_snapshots and snapshot_name in current_snapshots: + fail( + celery, + f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!", + ) + return + + # Get the list of all RBD volumes + rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") + + current_stage = 0 + total_stages = 2 + len(rbd_list) + start( + celery, + f"Creating snapshot '{snapshot_name}' of VM '{domain}'", + current=current_stage, + total=total_stages, + ) + + snap_list = list() + + # If a snapshot fails, clean up any snapshots that were successfuly created + def cleanup_failure(): + for snapshot in snap_list: + rbd, snapshot_name = snapshot.split("@") + pool, volume = rbd.split("/") + # We capture no output here, because if this fails too we're in a deep + # error chain and will just ignore it + ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) + + # Iterrate through and create a snapshot for each RBD volume + for rbd in rbd_list: + current_stage += 1 + update( + celery, + f"Creating RBD snapshot of {rbd}", + current=current_stage, + total=total_stages, + ) + + pool, volume = rbd.split("/") + ret, msg = ceph.add_snapshot( + zkhandler, pool, volume, snapshot_name, zk_only=zk_only + ) + if not ret: + cleanup_failure() + fail( + celery, + msg.replace("ERROR: ", ""), + ) + return + else: + snap_list.append(f"{pool}/{volume}@{snapshot_name}") + + current_stage += 1 + update( + celery, + "Creating VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + # Get the current timestamp + tstart = time.time() + # Get the current domain XML + vm_config = zkhandler.read(("domain.xml", dom_uuid)) + + # Add the snapshot entry to Zookeeper + zkhandler.write( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ), + snapshot_name, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ), + tstart, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ), + vm_config, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ), + ",".join(snap_list), + ), + ] + ) + + current_stage += 1 + return finish( + celery, + f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'", + current=current_stage, + total=total_stages, + ) + + +def vm_worker_remove_snapshot(zkhandler, celery, domain, snapshot_name): + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return + + if not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'!", + ) + return + + _snapshots = zkhandler.read( + ("domain.snapshots", dom_uuid, "domain_snapshot.rbd_snapshots", snapshot_name) + ) + rbd_snapshots = _snapshots.split(",") + + current_stage = 0 + total_stages = 2 + len(rbd_snapshots) + start( + celery, + f"Removing snapshot '{snapshot_name}' of VM '{domain}'", + current=current_stage, + total=total_stages, + ) + + for snap in rbd_snapshots: + current_stage += 1 + update( + celery, + f"Removing RBD snapshot {snap}", + current=current_stage, + total=total_stages, + ) + + rbd, name = snap.split("@") + pool, volume = rbd.split("/") + ret, msg = ceph.remove_snapshot(zkhandler, pool, volume, name) + if not ret: + fail( + celery, + msg.replace("ERROR: ", ""), + ) + return + + current_stage += 1 + update( + celery, + "Deleting VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + ret = zkhandler.delete( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ) + if not ret: + return ( + False, + f'ERROR: Failed to delete snapshot "{snapshot_name}" of VM "{domain}" in Zookeeper.', + ) + + current_stage += 1 + return finish( + celery, + f"Successfully removed snapshot '{snapshot_name}' of VM '{domain}'", + current=current_stage, + total=total_stages, + ) + + +def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name): + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return + + # Verify that the VM is in a stopped state; renaming is not supported otherwise + state = zkhandler.read(("domain.state", dom_uuid)) + if state not in ["stop", "disable"]: + fail( + celery, + f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running", + ) + return + + # Verify that the snapshot exists + if not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + + _snapshots = zkhandler.read( + ("domain.snapshots", dom_uuid, "domain_snapshot.rbd_snapshots", snapshot_name) + ) + rbd_snapshots = _snapshots.split(",") + + current_stage = 0 + total_stages = 2 + len(rbd_snapshots) + start( + celery, + f"Rolling back to snapshot '{snapshot_name}' of VM '{domain}'", + current=current_stage, + total=total_stages, + ) + + for snap in rbd_snapshots: + current_stage += 1 + update( + celery, + f"Rolling back RBD snapshot {snap}", + current=current_stage, + total=total_stages, + ) + + rbd, name = snap.split("@") + pool, volume = rbd.split("/") + ret, msg = ceph.rollback_snapshot(zkhandler, pool, volume, name) + if not ret: + fail( + celery, + msg.replace("ERROR: ", ""), + ) + return + + current_stage += 1 + update( + celery, + "Rolling back VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + # Get the snapshot domain XML + vm_config = zkhandler.read( + ("domain.snapshots", dom_uuid, "domain_snapshot.xml", snapshot_name) + ) + + # Write the restored config to the main XML config + zkhandler.write( + [ + ( + ( + "domain.xml", + dom_uuid, + ), + vm_config, + ), + ] + ) + + current_stage += 1 + return finish( + celery, + f"Successfully rolled back to snapshot '{snapshot_name}' of VM '{domain}'", + current=current_stage, + total=total_stages, + ) + + +def vm_worker_export_snapshot( + zkhandler, celery, domain, snapshot_name, export_path, incremental_parent=None +): + # Validate that the target path is valid + if not re.match(r"^/", export_path): + fail( + celery, + f"Target path '{export_path}' is not a valid absolute path", + ) + return + + # Ensure that backup_path (on this node) exists + myhostname = gethostname().split(".")[0] + if not os.path.isdir(export_path): + fail( + celery, + f"ERROR: Target path '{export_path}' does not exist on node '{myhostname}'", + ) + return + + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return + + if not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + return + + if incremental_parent is not None and not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + return + + # Get details about VM snapshot + _, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ) + ), + ] + ) + snapshot_rbdsnaps = snapshot_rbdsnaps.split(",") + + current_stage = 0 + total_stages = 2 + len(snapshot_rbdsnaps) + start( + celery, + f"Exporting snapshot '{snapshot_name}' of VM '{domain}' to '{export_path}'", + current=current_stage, + total=total_stages, + ) + + # Create destination directory + export_target_path = f"{export_path}/{domain}/{snapshot_name}/images" + try: + os.makedirs(export_target_path) + except Exception as e: + fail( + celery, + f"Failed to create target directory '{export_target_path}': {e}", + ) + return + + def export_cleanup(): + from shutil import rmtree + + rmtree(f"{export_path}/{domain}/{snapshot_name}") + + export_type = "incremental" if incremental_parent is not None else "full" + + # Get information about VM + vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + if not isinstance(vm_detail, dict): + fail(celery, f"VM listing returned invalid data: {vm_detail}") + return + + # Override the current XML with the snapshot XML; but all other metainfo is current + vm_detail["xml"] = snapshot_xml + + # Get the list of volumes + snapshot_volumes = list() + for rbdsnap in snapshot_rbdsnaps: + pool, _volume = rbdsnap.split("/") + volume, name = _volume.split("@") + ret, snapshots = ceph.get_list_snapshot( + zkhandler, pool, volume, limit=name, is_fuzzy=False + ) + if ret: + snapshot_volumes += snapshots + + # Set the export filetype + if incremental_parent is not None: + export_fileext = "rbddiff" + else: + export_fileext = "rbdimg" + + # Dump snapshot to folder with `rbd export` (full) or `rbd export-diff` (incremental) + export_files = list() + for snapshot_volume in snapshot_volumes: + pool = snapshot_volume["pool"] + volume = snapshot_volume["volume"] + snapshot_name = snapshot_volume["snapshot"] + size = snapshot_volume["stats"]["size"] + snap = f"{pool}/{volume}@{snapshot_name}" + + current_stage += 1 + update( + celery, + f"Exporting RBD snapshot {snap}", + current=current_stage, + total=total_stages, + ) + + if incremental_parent is not None: + retcode, stdout, stderr = common.run_os_command( + f"rbd export-diff --from-snap {incremental_parent} {pool}/{volume}@{snapshot_name} {export_target_path}/{pool}.{volume}.{export_fileext}" + ) + if retcode: + export_cleanup() + fail( + celery, f"Failed to export snapshot for volume(s) '{pool}/{volume}'" + ) + return + else: + export_files.append((f"images/{pool}.{volume}.{export_fileext}", size)) + else: + retcode, stdout, stderr = common.run_os_command( + f"rbd export --export-format 2 {pool}/{volume}@{snapshot_name} {export_target_path}/{pool}.{volume}.{export_fileext}" + ) + if retcode: + export_cleanup() + fail( + celery, f"Failed to export snapshot for volume(s) '{pool}/{volume}'" + ) + return + else: + export_files.append((f"images/{pool}.{volume}.{export_fileext}", size)) + + current_stage += 1 + update( + celery, + "Writing snapshot details", + current=current_stage, + total=total_stages, + ) + + def get_dir_size(path): + total = 0 + with scandir(path) as it: + for entry in it: + if entry.is_file(): + total += entry.stat().st_size + elif entry.is_dir(): + total += get_dir_size(entry.path) + return total + + export_files_size = get_dir_size(export_target_path) + + export_details = { + "type": export_type, + "snapshot_name": snapshot_name, + "incremental_parent": incremental_parent, + "vm_detail": vm_detail, + "export_files": export_files, + "export_size_bytes": export_files_size, + } + try: + with open(f"{export_path}/{domain}/{snapshot_name}/snapshot.json", "w") as fh: + jdump(export_details, fh) + except Exception as e: + export_cleanup() + fail(celery, f"Failed to export configuration snapshot: {e}") + return + + current_stage += 1 + return finish( + celery, + f"Successfully exported snapshot '{snapshot_name}' of VM '{domain}' to '{export_path}'", + current=current_stage, + total=total_stages, + ) + + +def vm_worker_import_snapshot( + zkhandler, celery, domain, snapshot_name, import_path, retain_snapshot=True +): + myhostname = gethostname().split(".")[0] + + # 0. Validations + # Validate that VM does not exist in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if dom_uuid: + fail( + celery, + f'VM "{domain}" already exists in the cluster; remove or rename it before importing a snapshot', + ) + return + + # Validate that the source path is valid + if not re.match(r"^/", import_path): + fail( + celery, + f"Source path '{import_path}; is not a valid absolute path", + ) + return + + # Ensure that import_path (on this node) exists + if not os.path.isdir(import_path): + fail( + celery, + f"Source path '{import_path}' does not exist on node '{myhostname}'", + ) + return + + # Ensure that domain path (on this node) exists + vm_import_path = f"{import_path}/{domain}" + if not os.path.isdir(vm_import_path): + fail(celery, f"Source VM path '{vm_import_path}' does not exist") + return + + # Ensure that the archives are present + export_source_snapshot_file = f"{vm_import_path}/{snapshot_name}/snapshot.json" + if not os.path.isfile(export_source_snapshot_file): + fail( + celery, f"ERROR: The specified source export '{snapshot_name}' do not exist" + ) + return + + # Read the export file and get VM details + try: + with open(export_source_snapshot_file) as fh: + export_source_details = jload(fh) + except Exception as e: + fail( + celery, + f"Failed to read source export details: {e}", + ) + return + + # Handle incrementals + incremental_parent = export_source_details.get("incremental_parent", None) + if incremental_parent is not None: + export_source_parent_snapshot_file = ( + f"{vm_import_path}/{incremental_parent}/snapshot.json" + ) + if not os.path.isfile(export_source_parent_snapshot_file): + fail( + celery, + f"Export is incremental but required incremental parent files do not exist at '{myhostname}:{vm_import_path}/{incremental_parent}'", + ) + return + + try: + with open(export_source_parent_snapshot_file) as fh: + export_source_parent_details = jload(fh) + except Exception as e: + fail( + celery, + f"Failed to read source incremental parent export details: {e}", + ) + return + + current_stage = 0 + total_stages = 2 + total_stages += 2 * len(export_source_details.get("export_files")) + if incremental_parent is not None: + total_stages += 1 + total_stages += len(export_source_parent_details.get("export_files")) + start( + celery, + f"Importing snapshot '{snapshot_name}' of VM '{domain}' from '{import_path}'", + current=current_stage, + total=total_stages, + ) + + # 4. Import volumes + if incremental_parent is not None: + for volume_file, volume_size in export_source_details.get("export_files"): + volume_size = f"{volume_size}B" + pool, volume, _ = volume_file.split("/")[-1].split(".") + + try: + parent_volume_file = [ + f[0] + for f in export_source_parent_details.get("export_files") + if f[0].split("/")[-1].replace(".rbdimg", "") + == volume_file.split("/")[-1].replace(".rbddiff", "") + ][0] + except Exception as e: + fail( + celery, + f"Failed to find parent volume for volume {pool}/{volume}; export may be corrupt or invalid: {e}", + ) + return + + # First we create the expected volumes then clean them up + # This process is a bit of a hack because rbd import does not expect an existing volume, + # but we need the information in PVC. + # Thus create the RBD volume using ceph.add_volume based on the export size, and then + # manually remove the RBD volume (leaving the PVC metainfo) + current_stage += 1 + update( + celery, + f"Preparing RBD volume {pool}/{volume}", + current=current_stage, + total=total_stages, + ) + + retcode, retmsg = ceph.add_volume(zkhandler, pool, volume, volume_size) + if not retcode: + fail(celery, f"Failed to create imported volume: {retmsg}") + return + + retcode, stdout, stderr = common.run_os_command( + f"rbd remove {pool}/{volume}" + ) + if retcode: + fail( + celery, + f"Failed to remove temporary RBD volume '{pool}/{volume}': {stderr}", + ) + return + + current_stage += 1 + update( + celery, + f"Importing RBD snapshot {pool}/{volume}@{incremental_parent}", + current=current_stage, + total=total_stages, + ) + + # Next we import the parent image + retcode, stdout, stderr = common.run_os_command( + f"rbd import --export-format 2 --dest-pool {pool} {import_path}/{domain}/{incremental_parent}/{parent_volume_file} {volume}" + ) + if retcode: + fail( + celery, + f"Failed to import parent export image {parent_volume_file}: {stderr}", + ) + return + + # Import VM config and metadata in import state, from the *source* details + current_stage += 1 + update( + celery, + f"Importing VM configuration snapshot {incremental_parent}", + current=current_stage, + total=total_stages, + ) + + try: + retcode, retmsg = define_vm( + zkhandler, + export_source_parent_details["vm_detail"]["xml"], + export_source_parent_details["vm_detail"]["node"], + export_source_parent_details["vm_detail"]["node_limit"], + export_source_parent_details["vm_detail"]["node_selector"], + export_source_parent_details["vm_detail"]["node_autostart"], + export_source_parent_details["vm_detail"]["migration_method"], + export_source_parent_details["vm_detail"]["migration_max_downtime"], + export_source_parent_details["vm_detail"]["profile"], + export_source_parent_details["vm_detail"]["tags"], + "import", + ) + if not retcode: + fail( + celery, + f"Failed to define imported VM: {retmsg}", + ) + return + except Exception as e: + fail( + celery, + f"Failed to parse VM export details: {e}", + ) + return + + # Handle the VM snapshots + if retain_snapshot: + current_stage += 1 + update( + celery, + "Recreating incremental parent snapshot", + current=current_stage, + total=total_stages, + ) + + # Create the parent snapshot + retcode, retmsg = vm_worker_create_snapshot( + zkhandler, None, domain, snapshot_name=incremental_parent, zk_only=True + ) + if not retcode: + fail( + celery, + f"Failed to create imported snapshot for {incremental_parent} (parent): {retmsg}", + ) + return + + for volume_file, volume_size in export_source_details.get("export_files"): + current_stage += 1 + update( + celery, + f"Importing RBD snapshot {pool}/{volume}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + volume_size = f"{volume_size}B" + pool, volume, _ = volume_file.split("/")[-1].split(".") + # Then we import the incremental diffs + retcode, stdout, stderr = common.run_os_command( + f"rbd import-diff {import_path}/{domain}/{snapshot_name}/{volume_file} {pool}/{volume}" + ) + if retcode: + fail( + celery, + f"Failed to import incremental export image {volume_file}: {stderr}", + ) + return + + if not retain_snapshot: + retcode, stdout, stderr = common.run_os_command( + f"rbd snap rm {pool}/{volume}@{incremental_parent}" + ) + if retcode: + fail( + celery, + f"Failed to remove imported image snapshot '{pool}/{volume}@{incremental_parent}': {stderr}", + ) + return + + retcode, stdout, stderr = common.run_os_command( + f"rbd snap rm {pool}/{volume}@{snapshot_name}" + ) + if retcode: + fail( + celery, + f"Failed to remove imported image snapshot '{pool}/{volume}@{snapshot_name}': {stderr}", + ) + return + + # Now update VM config and metadata, from the *current* details + current_stage += 1 + update( + celery, + f"Importing VM configuration snapshot {snapshot_name}", + current=current_stage, + total=total_stages, + ) + + try: + retcode, retmsg = modify_vm( + zkhandler, + domain, + False, + export_source_details["vm_detail"]["xml"], + ) + if not retcode: + fail( + celery, + f"Failed to modify imported VM: {retmsg}", + ) + return + + retcode, retmsg = move_vm( + zkhandler, + domain, + export_source_details["vm_detail"]["node"], + ) + if not retcode: + # We don't actually care if this fails, because it just means the vm was never moved + pass + + retcode, retmsg = modify_vm_metadata( + zkhandler, + domain, + export_source_details["vm_detail"]["node_limit"], + export_source_details["vm_detail"]["node_selector"], + export_source_details["vm_detail"]["node_autostart"], + export_source_details["vm_detail"]["profile"], + export_source_details["vm_detail"]["migration_method"], + export_source_details["vm_detail"]["migration_max_downtime"], + ) + if not retcode: + fail( + celery, + f"Failed to modify imported VM: {retmsg}", + ) + return + except Exception as e: + fail( + celery, + f"Failed to parse VM export details: {e}", + ) + return + + if retain_snapshot: + current_stage += 1 + update( + celery, + "Recreating imported snapshot", + current=current_stage, + total=total_stages, + ) + + # Create the child snapshot + retcode, retmsg = vm_worker_create_snapshot( + zkhandler, None, domain, snapshot_name=snapshot_name, zk_only=True + ) + if not retcode: + fail( + celery, + f"Failed to create imported snapshot for {snapshot_name}: {retmsg}", + ) + return + else: + for volume_file, volume_size in export_source_details.get("export_files"): + volume_size = f"{volume_size}B" + pool, volume, _ = volume_file.split("/")[-1].split(".") + + # First we create the expected volumes then clean them up + # This process is a bit of a hack because rbd import does not expect an existing volume, + # but we need the information in PVC. + # Thus create the RBD volume using ceph.add_volume based on the export size, and then + # manually remove the RBD volume (leaving the PVC metainfo) + current_stage += 1 + update( + celery, + f"Preparing RBD volume {pool}/{volume}", + current=current_stage, + total=total_stages, + ) + + retcode, retmsg = ceph.add_volume(zkhandler, pool, volume, volume_size) + if not retcode: + fail( + celery, + f"Failed to create imported volume: {retmsg}", + ) + return + + retcode, stdout, stderr = common.run_os_command( + f"rbd remove {pool}/{volume}" + ) + if retcode: + fail( + celery, + f"Failed to remove temporary RBD volume '{pool}/{volume}': {stderr}", + ) + return + + # Then we perform the actual import + current_stage += 1 + update( + celery, + f"Importing RBD snapshot {pool}/{volume}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + retcode, stdout, stderr = common.run_os_command( + f"rbd import --export-format 2 --dest-pool {pool} {import_path}/{domain}/{snapshot_name}/{volume_file} {volume}" + ) + if retcode: + fail( + celery, + f"Failed to import export image {volume_file}: {stderr}", + ) + + if not retain_snapshot: + retcode, stdout, stderr = common.run_os_command( + f"rbd snap rm {pool}/{volume}@{snapshot_name}" + ) + if retcode: + fail( + celery, + f"Failed to remove imported image snapshot '{pool}/{volume}@{snapshot_name}': {stderr}", + ) + return + + # Import VM config and metadata in provision state + current_stage += 1 + update( + celery, + f"Importing VM configuration snapshot {snapshot_name}", + current=current_stage, + total=total_stages, + ) + + try: + retcode, retmsg = define_vm( + zkhandler, + export_source_details["vm_detail"]["xml"], + export_source_details["vm_detail"]["node"], + export_source_details["vm_detail"]["node_limit"], + export_source_details["vm_detail"]["node_selector"], + export_source_details["vm_detail"]["node_autostart"], + export_source_details["vm_detail"]["migration_method"], + export_source_details["vm_detail"]["migration_max_downtime"], + export_source_details["vm_detail"]["profile"], + export_source_details["vm_detail"]["tags"], + "import", + ) + if not retcode: + fail( + celery, + f"ERROR: Failed to define imported VM: {retmsg}", + ) + return + except Exception as e: + fail( + celery, + f"ERROR: Failed to parse VM export details: {e}", + ) + return + + # Finally we handle the VM snapshot + if retain_snapshot: + current_stage += 1 + update( + celery, + "Recreating imported snapshot", + current=current_stage, + total=total_stages, + ) + + retcode, retmsg = vm_worker_create_snapshot( + zkhandler, None, domain, snapshot_name=snapshot_name, zk_only=True + ) + if not retcode: + fail( + celery, + f"Failed to create imported snapshot for {snapshot_name}: {retmsg}", + ) + return + + # 5. Start VM + retcode, retmsg = start_vm(zkhandler, domain) + if not retcode: + fail( + celery, + f"ERROR: Failed to start imported VM {domain}: {retmsg}", + ) + return + + return finish( + celery, + f"Successfully imported VM '{domain}' at snapshot '{snapshot_name}' from '{myhostname}:{import_path}'", + current=current_stage, + total=total_stages, ) diff --git a/worker-daemon/pvcworkerd/Daemon.py b/worker-daemon/pvcworkerd/Daemon.py index 362d926e..ba2007a6 100755 --- a/worker-daemon/pvcworkerd/Daemon.py +++ b/worker-daemon/pvcworkerd/Daemon.py @@ -28,6 +28,11 @@ from daemon_lib.vm import ( vm_worker_flush_locks, vm_worker_attach_device, vm_worker_detach_device, + vm_worker_create_snapshot, + vm_worker_remove_snapshot, + vm_worker_rollback_snapshot, + vm_worker_export_snapshot, + vm_worker_import_snapshot, ) from daemon_lib.ceph import ( osd_worker_add_osd, @@ -123,6 +128,87 @@ def vm_device_detach(self, domain=None, xml=None, run_on=None): return run_vm_device_detach(self, domain, xml) +@celery.task(name="vm.create_snapshot", bind=True, routing_key="run_on") +def vm_create_snapshot(self, domain=None, snapshot_name=None, run_on="primary"): + @ZKConnection(config) + def run_vm_create_snapshot(zkhandler, self, domain, snapshot_name): + return vm_worker_create_snapshot(zkhandler, self, domain, snapshot_name) + + return run_vm_create_snapshot(self, domain, snapshot_name) + + +@celery.task(name="vm.remove_snapshot", bind=True, routing_key="run_on") +def vm_remove_snapshot(self, domain=None, snapshot_name=None, run_on="primary"): + @ZKConnection(config) + def run_vm_remove_snapshot(zkhandler, self, domain, snapshot_name): + return vm_worker_remove_snapshot(zkhandler, self, domain, snapshot_name) + + return run_vm_remove_snapshot(self, domain, snapshot_name) + + +@celery.task(name="vm.rollback_snapshot", bind=True, routing_key="run_on") +def vm_rollback_snapshot(self, domain=None, snapshot_name=None, run_on="primary"): + @ZKConnection(config) + def run_vm_rollback_snapshot(zkhandler, self, domain, snapshot_name): + return vm_worker_rollback_snapshot(zkhandler, self, domain, snapshot_name) + + return run_vm_rollback_snapshot(self, domain, snapshot_name) + + +@celery.task(name="vm.export_snapshot", bind=True, routing_key="run_on") +def vm_export_snapshot( + self, + domain=None, + snapshot_name=None, + export_path=None, + incremental_parent=None, + run_on="primary", +): + @ZKConnection(config) + def run_vm_export_snapshot( + zkhandler, self, domain, snapshot_name, export_path, incremental_parent=None + ): + return vm_worker_export_snapshot( + zkhandler, + self, + domain, + snapshot_name, + export_path, + incremental_parent=incremental_parent, + ) + + return run_vm_export_snapshot( + self, domain, snapshot_name, export_path, incremental_parent=incremental_parent + ) + + +@celery.task(name="vm.import_snapshot", bind=True, routing_key="run_on") +def vm_import_snapshot( + self, + domain=None, + snapshot_name=None, + import_path=None, + retain_snapshot=True, + run_on="primary", +): + @ZKConnection(config) + def run_vm_import_snapshot( + zkhandler, self, domain, snapshot_name, import_path, retain_snapshot=True + ): + return vm_worker_import_snapshot( + zkhandler, + self, + domain, + snapshot_name, + import_path, + retain_snapshot=retain_snapshot, + ) + + return run_vm_import_snapshot( + self, domain, snapshot_name, import_path, retain_snapshot=retain_snapshot + ) + + @celery.task(name="osd.add", bind=True, routing_key="run_on") def osd_add( self,