diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 7a8cbe35..9ccb7411 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -4015,6 +4015,229 @@ class API_VM_Snapshot_Receive_Config(Resource): api.add_resource(API_VM_Snapshot_Receive_Config, "/vm//snapshot/receive/config") +# /vm//mirror/create +class API_VM_Mirror_Create(Resource): + @RequestParser( + [ + { + "name": "destination_api_uri", + "required": True, + "helptext": "A destination API URI must be specified", + }, + { + "name": "destination_api_key", + "required": True, + "helptext": "A destination API key must be specified", + }, + { + "name": "destination_api_verify_ssl", + "required": False, + }, + { + "name": "destination_storage_pool", + "required": False, + }, + ] + ) + @Authenticator + def post(self, vm, reqargs): + """ + Create (or update) a snapshot mirror of a VM to a remote cluster + + This method handles both the creation of a new VM snapshot, as well as sending that snapshot to a remote cluster, creating or updating a VM mirror. It will also automatically handle full vs. incremental block sends if possible based on the available snapshots on both sides. + --- + tags: + - vm + parameters: + - in: query + name: destination_api_uri + type: string + required: true + description: The base API URI of the destination PVC cluster (with prefix if applicable) + - in: query + name: destination_api_key + type: string + required: true + description: The API authentication key of the destination PVC cluster + - in: query + name: destination_api_verify_ssl + type: boolean + required: false + default: true + description: Whether or not to validate SSL certificates for an SSL-enabled destination API + - in: query + name: destination_storage_pool + type: string + required: false + default: source storage pool name + description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool + responses: + 202: + description: Accepted + schema: + type: object + description: The Celery job information of the task + id: CeleryTask + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + destination_api_uri = reqargs.get("destination_api_uri", None) + destination_api_key = reqargs.get("destination_api_key", None) + destination_api_verify_ssl = bool( + strtobool(reqargs.get("destination_api_verify_ssl", "true")) + ) + destination_storage_pool = reqargs.get("destination_storage_pool", None) + + task = run_celery_task( + "vm.create_mirror", + domain=vm, + destination_api_uri=destination_api_uri, + destination_api_key=destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + destination_storage_pool=destination_storage_pool, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.send_snapshot", + "run_on": f"{get_primary_node()} (primary)", + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) + + +api.add_resource(API_VM_Mirror_Create, "/vm//mirror/create") + + +# /vm//mirror/promote +class API_VM_Mirror_Promote(Resource): + @RequestParser( + [ + { + "name": "destination_api_uri", + "required": True, + "helptext": "A destination API URI must be specified", + }, + { + "name": "destination_api_key", + "required": True, + "helptext": "A destination API key must be specified", + }, + { + "name": "destination_api_verify_ssl", + "required": False, + }, + { + "name": "destination_storage_pool", + "required": False, + }, + { + "name": "remove_on_source", + "required": False, + }, + ] + ) + @Authenticator + def post(self, vm, reqargs): + """ + Promote a snapshot mirror of a VM on a remote cluster, flipping "mirror" state, and optionally removing the source VM on this cluster. + + This method handles shutting down the VM on this cluster, creating a new VM snapshot, sending that snapshot to a remote cluster, then starting up the VM on the remote cluster. It will also automatically handle full vs. incremental block sends if possible based on the available snapshots on both sides. + + NOTE: This method may be used alone to perform a one-shot cross-cluster move; creating the mirror first is not required, though doing so will improve performance by allowing an incremental block send. + --- + tags: + - vm + parameters: + - in: query + name: destination_api_uri + type: string + required: true + description: The base API URI of the destination PVC cluster (with prefix if applicable) + - in: query + name: destination_api_key + type: string + required: true + description: The API authentication key of the destination PVC cluster + - in: query + name: destination_api_verify_ssl + type: boolean + required: false + default: true + description: Whether or not to validate SSL certificates for an SSL-enabled destination API + - in: query + name: destination_storage_pool + type: string + required: false + default: source storage pool name + description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool + - in: query + name: remove_on_source + required: false + default: false + description: Remove the VM on the source cluster once promoted (performs a full move between clusters) + responses: + 202: + description: Accepted + schema: + type: object + description: The Celery job information of the task + id: CeleryTask + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + destination_api_uri = reqargs.get("destination_api_uri", None) + destination_api_key = reqargs.get("destination_api_key", None) + destination_api_verify_ssl = bool( + strtobool(reqargs.get("destination_api_verify_ssl", "true")) + ) + destination_storage_pool = reqargs.get("destination_storage_pool", None) + remove_on_source = reqargs.get("remove_on_source", False) + + task = run_celery_task( + "vm.promote_mirror", + domain=vm, + destination_api_uri=destination_api_uri, + destination_api_key=destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + destination_storage_pool=destination_storage_pool, + remove_on_source=remove_on_source, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.send_snapshot", + "run_on": f"{get_primary_node()} (primary)", + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) + + +api.add_resource(API_VM_Mirror_Promote, "/vm//mirror/promote") + + # /vm/autobackup class API_VM_Autobackup_Root(Resource): @RequestParser( diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index a3e788fa..068c061e 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -2078,7 +2078,7 @@ def cli_vm_snapshot_send( Incremental sends are possible by specifying the "-i"/"--incremental-parent" option along with a parent snapshot name. To correctly receive, that parent snapshot must exist on DESTINATION. Subsequent sends after the first do not have to be incremental, but an incremental send is likely to perform better than a full send if the VM experiences few writes. - WARNING: Once sent, the VM will be in the state "mirror" on the remote cluster. If it is subsequently started, for instance for disaster recovery, a new snapshot must be taken on the remote side and sent back or data will be inconsistent between the instances. Only VMs in the "mirror" state can accept new sends. + WARNING: Once sent, the VM will be in the state "mirror" on the destination cluster. If it is subsequently started, for instance for disaster recovery, a new snapshot must be taken on the destination cluster and sent back or data will be inconsistent between the instances. Only VMs in the "mirror" state can accept new sends. WARNING: This functionality has no automatic backout on the remote side. While a properly configured cluster should not fail any step in the process, a situation like an intermittent network connection might cause a failure which would have to be manually corrected on that side, usually by removing the mirrored VM and retrying, or rolling back to a previous snapshot and retrying. Future versions may enhance automatic recovery, but for now this would be up to the administrator. """ @@ -2114,6 +2114,199 @@ def cli_vm_snapshot_send( finish(retcode, retmsg) +############################################################################### +# > pvc vm mirror +############################################################################### +@click.group( + name="mirror", + short_help="Manage snapshot mirrors for PVC VMs.", + context_settings=CONTEXT_SETTINGS, +) +def cli_vm_mirror(): + """ + Manage snapshot mirrors of VMs in a PVC cluster. + """ + pass + + +############################################################################### +# > pvc vm mirror create +############################################################################### +@click.command( + name="create", + short_help="Create a snapshot mirror of a virtual machine to another PVC cluster.", +) +@connection_req +@click.argument("domain") +@click.argument("destination") +@click.option( + "-k", + "--destination-api-key", + "destination_api_key", + default=None, + help="The API key of the destination cluster when specifying an API URI.", +) +@click.option( + "-p", + "--destination-pool", + "destination_storage_pool", + default=None, + help="The target storage pool on the destination cluster, if it differs from the source pool.", +) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress if waiting", +) +def cli_vm_mirror_create( + domain, + destination, + destination_api_key, + destination_storage_pool, + wait_flag, +): + """ + For the virtual machine DOMAIN: create a new snapshot (dated), and send snapshot to the remote PVC cluster DESTINATION; creates a cross-cluster snapshot mirror of the VM. + + DOMAIN may be a UUID or name. DESTINATION may be either a configured PVC connection name in this CLI instance (i.e. a valid argument to "--connection"), or a full API URI, including the scheme, port and API prefix; if using the latter, an API key can be specified with the "-k"/"--destination-api-key" option. + + The send will include the VM configuration, metainfo, and a point-in-time snapshot of all attached RBD volumes. + + This command may be used repeatedly to send new updates for a remote VM mirror. If a valid shared snapshot is found on the destination cluster, block device transfers will be incremental based on that snapshot. + + By default, the storage pool of the sending cluster will be used at the destination cluster as well. If a pool of that name does not exist, specify one with the "-p"/"--detination-pool" option. + + WARNING: Once sent, the VM will be in the state "mirror" on the destination cluster. If it is subsequently started, for instance for disaster recovery, a new snapshot must be taken on the destination cluster and sent back or data will be inconsistent between the instances. Only VMs in the "mirror" state can accept new sends. Consider using "mirror promote" instead of any manual promotion attempts. + + WARNING: This functionality has no automatic backout on the remote side. While a properly configured cluster should not fail any step in the process, a situation like an intermittent network connection might cause a failure which would have to be manually corrected on that side, usually by removing the mirrored VM and retrying, or rolling back to a previous snapshot and retrying. Future versions may enhance automatic recovery, but for now this would be up to the administrator. + """ + + connections_config = get_store(CLI_CONFIG["store_path"]) + if destination in connections_config.keys(): + destination_cluster_config = connections_config[destination] + destination_api_uri = "{}://{}:{}{}".format( + destination_cluster_config["scheme"], + destination_cluster_config["host"], + destination_cluster_config["port"], + CLI_CONFIG["api_prefix"], + ) + destination_api_key = destination_cluster_config["api_key"] + else: + destination_api_uri = destination + destination_api_key = destination_api_key + + retcode, retmsg = pvc.lib.vm.vm_create_mirror( + CLI_CONFIG, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=CLI_CONFIG.get("verify_ssl"), + destination_storage_pool=destination_storage_pool, + wait_flag=wait_flag, + ) + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) + finish(retcode, retmsg) + + +############################################################################### +# > pvc vm mirror promote +############################################################################### +@click.command( + name="promote", + short_help="Shut down, create a snapshot mirror, and promote a virtual machine to another PVC cluster.", +) +@connection_req +@click.argument("domain") +@click.argument("destination") +@click.option( + "-k", + "--destination-api-key", + "destination_api_key", + default=None, + help="The API key of the destination cluster when specifying an API URI.", +) +@click.option( + "-p", + "--destination-pool", + "destination_storage_pool", + default=None, + help="The target storage pool on the destination cluster, if it differs from the source pool.", +) +@click.option( + "--remove/--no-remove", + "remove_flag", + is_flag=True, + default=False, + show_default=True, + help="Remove or don't remove the local VM after promoting (if set, performs a cross-cluster move).", +) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress if waiting", +) +def cli_vm_mirror_promote( + domain, + destination, + destination_api_key, + destination_storage_pool, + remove_flag, + wait_flag, +): + """ + For the virtual machine DOMAIN: shut down on this cluster, create a new snapshot (dated), send snapshot to the remote PVC cluster DESTINATION, start on DESTINATION, and optionally remove from this cluster; performs a cross-cluster move of the VM, with or without retaining the source as a snapshot mirror. + + DOMAIN may be a UUID or name. DESTINATION may be either a configured PVC connection name in this CLI instance (i.e. a valid argument to "--connection"), or a full API URI, including the scheme, port and API prefix; if using the latter, an API key can be specified with the "-k"/"--destination-api-key" option. + + The send will include the VM configuration, metainfo, and a point-in-time snapshot of all attached RBD volumes. + + If a valid shared snapshot is found on the destination cluster, block device transfers will be incremental based on that snapshot. + + By default, the storage pool of the sending cluster will be used at the destination cluster as well. If a pool of that name does not exist, specify one with the "-p"/"--detination-pool" option. + + WARNING: Once promoted, if the "--remove" flag is not set, the VM will be in the state "mirror" on this cluster. This effectively flips which cluster is the "primary" for this VM, and subsequent mirror management commands must be run against the destination cluster instead of this cluster. If the "--remove" flag is set, the VM will be removed from this cluster entirely once successfully started on the destination cluster. + + WARNING: This functionality has no automatic backout on the remote side. While a properly configured cluster should not fail any step in the process, a situation like an intermittent network connection might cause a failure which would have to be manually corrected on that side, usually by removing the mirrored VM and retrying, or rolling back to a previous snapshot and retrying. Future versions may enhance automatic recovery, but for now this would be up to the administrator. + """ + + connections_config = get_store(CLI_CONFIG["store_path"]) + if destination in connections_config.keys(): + destination_cluster_config = connections_config[destination] + destination_api_uri = "{}://{}:{}{}".format( + destination_cluster_config["scheme"], + destination_cluster_config["host"], + destination_cluster_config["port"], + CLI_CONFIG["api_prefix"], + ) + destination_api_key = destination_cluster_config["api_key"] + else: + destination_api_uri = destination + destination_api_key = destination_api_key + + retcode, retmsg = pvc.lib.vm.vm_promote_mirror( + CLI_CONFIG, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=CLI_CONFIG.get("verify_ssl"), + destination_storage_pool=destination_storage_pool, + remove_on_source=remove_flag, + wait_flag=wait_flag, + ) + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) + finish(retcode, retmsg) + + ############################################################################### # > pvc vm backup ############################################################################### @@ -6686,6 +6879,9 @@ cli_vm_snapshot.add_command(cli_vm_snapshot_export) cli_vm_snapshot.add_command(cli_vm_snapshot_import) cli_vm_snapshot.add_command(cli_vm_snapshot_send) cli_vm.add_command(cli_vm_snapshot) +cli_vm_mirror.add_command(cli_vm_mirror_create) +cli_vm_mirror.add_command(cli_vm_mirror_promote) +cli_vm.add_command(cli_vm_mirror) cli_vm_backup.add_command(cli_vm_backup_create) cli_vm_backup.add_command(cli_vm_backup_restore) cli_vm_backup.add_command(cli_vm_backup_remove) diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index e06065ce..2180c0e6 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -607,11 +607,11 @@ def vm_send_snapshot( wait_flag=True, ): """ - Send an (existing) snapshot of a VM's disks andconfiguration to a destination PVC cluster, optionally + Send an (existing) snapshot of a VM's disks and configuration to a destination PVC cluster, optionally incremental with incremental_parent API endpoint: POST /vm/{vm}/snapshot/send - API arguments: snapshot_name=snapshot_name, destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, incremental_parent=incremental_parent + API arguments: snapshot_name=snapshot_name, destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, incremental_parent=incremental_parent, destination_storage_pool=destination_storage_pool API schema: {"message":"{data}"} """ params = { @@ -632,6 +632,70 @@ def vm_send_snapshot( return get_wait_retdata(response, wait_flag) +def vm_create_mirror( + config, + vm, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + destination_storage_pool=None, + wait_flag=True, +): + """ + Create a new snapshot and send the snapshot to a destination PVC cluster, with automatic incremental handling + + API endpoint: POST /vm/{vm}/mirror/create + API arguments: destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, destination_storage_pool=destination_storage_pool + API schema: {"message":"{data}"} + """ + params = { + "destination_api_uri": destination_api_uri, + "destination_api_key": destination_api_key, + "destination_api_verify_ssl": destination_api_verify_ssl, + } + if destination_storage_pool is not None: + params["destination_storage_pool"] = destination_storage_pool + + response = call_api( + config, "post", "/vm/{vm}/mirror/create".format(vm=vm), params=params + ) + + return get_wait_retdata(response, wait_flag) + + +def vm_promote_mirror( + config, + vm, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + destination_storage_pool=None, + remove_on_source=False, + wait_flag=True, +): + """ + Shut down a VM, create a new snapshot, send the snapshot to a destination PVC cluster, start the VM on the remote cluster, and optionally remove the local VM, with automatic incremental handling + + API endpoint: POST /vm/{vm}/mirror/promote + API arguments: destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, destination_storage_pool=destination_storage_pool, remove_on_source=remove_on_source + API schema: {"message":"{data}"} + """ + params = { + "destination_api_uri": destination_api_uri, + "destination_api_key": destination_api_key, + "destination_api_verify_ssl": destination_api_verify_ssl, + "remove_on_source": remove_on_source, + } + if destination_storage_pool is not None: + params["destination_storage_pool"] = destination_storage_pool + + response = call_api( + config, "post", "/vm/{vm}/mirror/promote".format(vm=vm), params=params + ) + + return get_wait_retdata(response, wait_flag) + + def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_flag=True): """ Perform a cluster VM autobackup diff --git a/daemon-common/vm.py b/daemon-common/vm.py index df1d7730..f4f2c80e 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -24,6 +24,8 @@ import re import os.path import lxml.objectify import lxml.etree +import rados +import requests from concurrent.futures import ThreadPoolExecutor from datetime import datetime @@ -33,6 +35,8 @@ from json import load as jload from json import loads as jloads from libvirt import open as lvopen from os import scandir +from packaging.version import parse as parse_version +from rbd import Image as RBDImage from shutil import rmtree from socket import gethostname from uuid import UUID @@ -3132,10 +3136,6 @@ def vm_worker_send_snapshot( incremental_parent=None, destination_storage_pool=None, ): - import requests - import rados - import rbd - from packaging.version import parse as parse_version current_stage = 0 total_stages = 1 @@ -3156,7 +3156,11 @@ def vm_worker_send_snapshot( return False # Get our side's VM configuration details - vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + try: + vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + except KeyError: + vm_detail = None + if not isinstance(vm_detail, dict): fail( celery, @@ -3264,13 +3268,13 @@ def vm_worker_send_snapshot( params=None, data=None, ) - destination_vm_status = response.json() - if type(destination_vm_status) is list and len(destination_vm_status) > 0: - destination_vm_status = destination_vm_status[0] + destination_vm_detail = response.json() + if type(destination_vm_detail) is list and len(destination_vm_detail) > 0: + destination_vm_detail = destination_vm_detail[0] else: - destination_vm_status = {} + destination_vm_detail = {} - current_destination_vm_state = destination_vm_status.get("state", None) + current_destination_vm_state = destination_vm_detail.get("state", None) if ( current_destination_vm_state is not None and current_destination_vm_state != "mirror" @@ -3321,7 +3325,7 @@ def vm_worker_send_snapshot( snapshot_rbdsnaps = snapshot_rbdsnaps.split(",") # Get details about remote VM snapshots - destination_vm_snapshots = destination_vm_status.get("snapshots", []) + destination_vm_snapshots = destination_vm_detail.get("snapshots", []) # Check if this snapshot is in the remote list already if snapshot_name in [s["name"] for s in destination_vm_snapshots]: @@ -3467,7 +3471,7 @@ def vm_worker_send_snapshot( cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") cluster.connect() ioctx = cluster.open_ioctx(pool) - image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True) + image = RBDImage(ioctx, name=volume, snapshot=snapshot_name, read_only=True) size = image.size() chunk_size_mb = 1024 @@ -3676,3 +3680,1251 @@ def vm_worker_send_snapshot( current=current_stage, total=total_stages, ) + + +def vm_worker_create_mirror( + zkhandler, + celery, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl, + destination_storage_pool, +): + now = datetime.now() + snapshot_name = now.strftime("%Y%m%d%H%M%S") + + current_stage = 0 + total_stages = 1 + start( + celery, + f"Creating mirror of VM '{domain}' to cluster '{destination_api_uri}'", + current=current_stage, + total=total_stages, + ) + + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return False + + current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid)) + if current_snapshots and snapshot_name in current_snapshots: + # This should never actually happen since snapshots with mirror are dated, but worth + # checking just in case someone tries to be sneaky + fail( + celery, + f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!", + ) + return False + + # Get our side's VM configuration details + try: + vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + except KeyError: + vm_detail = None + + if not isinstance(vm_detail, dict): + fail( + celery, + f"VM listing returned invalid data: {vm_detail}", + ) + return False + + vm_name = vm_detail["name"] + + # Validate that the destination cluster can be reached + destination_api_timeout = (3.05, 172800) + destination_api_headers = { + "X-Api-Key": destination_api_key, + } + + session = requests.Session() + session.headers.update(destination_api_headers) + session.verify = destination_api_verify_ssl + session.timeout = destination_api_timeout + + try: + # Hit the API root; this should return "PVC API version x" + response = session.get( + f"{destination_api_uri}/", + timeout=destination_api_timeout, + params=None, + data=None, + ) + if "PVC API" not in response.json().get("message"): + raise ValueError("Remote API is not a PVC API or incorrect URI given") + except requests.exceptions.ConnectionError as e: + fail( + celery, + f"Connection to remote API timed out: {e}", + ) + return False + except ValueError as e: + fail( + celery, + f"Connection to remote API is not valid: {e}", + ) + return False + except Exception as e: + fail( + celery, + f"Connection to remote API failed: {e}", + ) + return False + + # Hit the API "/status" endpoint to validate API key and cluster status + response = session.get( + f"{destination_api_uri}/status", + params=None, + data=None, + ) + destination_cluster_status = response.json() + current_destination_pvc_version = destination_cluster_status.get( + "pvc_version", None + ) + if current_destination_pvc_version is None: + fail( + celery, + "Connection to remote API failed: no PVC version information returned", + ) + return False + + expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed + # Work around development versions + current_destination_pvc_version = re.sub( + r"~git-.*", "", current_destination_pvc_version + ) + # Compare versions + if parse_version(current_destination_pvc_version) < parse_version( + expected_destination_pvc_version + ): + fail( + celery, + f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher", + ) + return False + + # Check if the VM already exists on the remote + response = session.get( + f"{destination_api_uri}/vm/{domain}", + params=None, + data=None, + ) + destination_vm_detail = response.json() + if type(destination_vm_detail) is list and len(destination_vm_detail) > 0: + destination_vm_detail = destination_vm_detail[0] + else: + destination_vm_detail = {} + + current_destination_vm_state = destination_vm_detail.get("state", None) + if ( + current_destination_vm_state is not None + and current_destination_vm_state != "mirror" + ): + fail( + celery, + "Remote PVC VM exists and is not a mirror", + ) + return False + + # Get the list of all RBD volumes + rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") + + # Snapshot creation stages + total_stages += 1 + len(rbd_list) + # Snapshot sending stages + total_stages = 2 + (3 * len(rbd_list)) + + # + # 1. Create snapshot + # + + snap_list = list() + + # If a snapshot fails, clean up any snapshots that were successfuly created + def cleanup_failure(): + for snapshot in snap_list: + rbd, snapshot_name = snapshot.split("@") + pool, volume = rbd.split("/") + # We capture no output here, because if this fails too we're in a deep + # error chain and will just ignore it + ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) + + # Iterrate through and create a snapshot for each RBD volume + for rbd in rbd_list: + current_stage += 1 + update( + celery, + f"Creating RBD snapshot of {rbd}", + current=current_stage, + total=total_stages, + ) + + pool, volume = rbd.split("/") + ret, msg = ceph.add_snapshot( + zkhandler, pool, volume, snapshot_name, zk_only=False + ) + if not ret: + cleanup_failure() + fail( + celery, + msg.replace("ERROR: ", ""), + ) + return False + else: + snap_list.append(f"{pool}/{volume}@{snapshot_name}") + + current_stage += 1 + update( + celery, + "Creating VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + # Get the current timestamp + tstart = time.time() + # Get the current domain XML + vm_config = zkhandler.read(("domain.xml", dom_uuid)) + + # Add the snapshot entry to Zookeeper + zkhandler.write( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ), + snapshot_name, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ), + tstart, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ), + vm_config, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ), + ",".join(snap_list), + ), + ] + ) + + # + # 2. Send snapshot to remote + # + + # Determine if there's a valid shared snapshot to send an incremental diff from + local_snapshots = {s["name"] for s in vm_detail["snapshots"]} + remote_snapshots = {s["name"] for s in destination_vm_detail["snapshots"]} + incremental_parent = next( + (s for s in local_snapshots if s in remote_snapshots), None + ) + + current_stage += 1 + update( + celery, + f"Sending VM configuration for {vm_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + send_params = { + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + try: + response = session.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", + headers={"Content-Type": "application/json"}, + params=send_params, + json=vm_detail, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send config: {e}", + ) + return False + + # Create the block devices on the remote side if this is a new VM send + block_t_start = time.time() + block_total_mb = 0 + + for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]: + rbd_name = rbd_detail["name"] + pool, volume = rbd_name.split("/") + + current_stage += 1 + update( + celery, + f"Preparing remote volume for {rbd_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + # Get the storage volume details + retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False) + if not retcode or len(retdata) != 1: + if len(retdata) < 1: + error_message = f"No detail returned for volume {rbd_name}" + elif len(retdata) > 1: + error_message = f"Multiple details returned for volume {rbd_name}" + else: + error_message = f"Error getting details for volume {rbd_name}" + fail( + celery, + error_message, + ) + return False + + try: + local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) + except Exception as e: + error_message = f"Failed to get volume size for {rbd_name}: {e}" + + if destination_storage_pool is not None: + pool = destination_storage_pool + + current_stage += 1 + update( + celery, + f"Checking remote volume {rbd_name} for compliance", + current=current_stage, + total=total_stages, + ) + + # Check if the volume exists on the target + response = session.get( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + params=None, + data=None, + ) + if response.status_code != 404 and current_destination_vm_state is None: + fail( + celery, + f"Remote storage pool {pool} already contains volume {volume}", + ) + return False + + if current_destination_vm_state is not None: + try: + remote_volume_size = ceph.format_bytes_fromhuman( + response.json()[0]["stats"]["size"] + ) + except Exception as e: + error_message = f"Failed to get volume size for remote {rbd_name}: {e}" + fail(celery, error_message) + return False + + if local_volume_size != remote_volume_size: + response = session.put( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + params={"new_size": local_volume_size, "force": True}, + ) + if response.status_code != 200: + fail( + celery, + "Failed to resize remote volume to match local volume", + ) + return False + + # Send the volume to the remote + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") + cluster.connect() + ioctx = cluster.open_ioctx(pool) + image = RBDImage(ioctx, name=volume, snapshot=snapshot_name, read_only=True) + size = image.size() + chunk_size_mb = 1024 + + if incremental_parent is not None: + # Diff between incremental_parent and snapshot + celery_message = ( + f"Sending diff of {rbd_name}@{incremental_parent} → {snapshot_name}" + ) + else: + # Full image transfer + celery_message = f"Sending full image of {rbd_name}@{snapshot_name}" + + current_stage += 1 + update( + celery, + celery_message, + current=current_stage, + total=total_stages, + ) + + if incremental_parent is not None: + # Createa single session to reuse connections + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + + session.params.update(send_params) + + # Send 32 objects (128MB) at once + send_max_objects = 32 + batch_size_mb = 4 * send_max_objects + batch_size = batch_size_mb * 1024 * 1024 + + total_chunks = 0 + + def diff_cb_count(offset, length, exists): + nonlocal total_chunks + if exists: + total_chunks += 1 + + current_chunk = 0 + buffer = list() + buffer_size = 0 + last_chunk_time = time.time() + + def send_batch_multipart(buffer): + nonlocal last_chunk_time + files = {} + for i in range(len(buffer)): + files[f"object_{i}"] = ( + f"object_{i}", + buffer[i], + "application/octet-stream", + ) + try: + response = session.put( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + files=files, + stream=True, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send diff batch ({e}): {response.json()['message']}", + ) + return False + + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(batch_size_mb / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, + ) + + def add_block_to_multipart(buffer, offset, length, data): + part_data = ( + offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data + ) # Add header and data + buffer.append(part_data) + + def diff_cb_send(offset, length, exists): + nonlocal current_chunk, buffer, buffer_size + if exists: + # Read the data for the current block + data = image.read(offset, length) + # Add the block to the multipart buffer + add_block_to_multipart(buffer, offset, length, data) + current_chunk += 1 + buffer_size += len(data) + if buffer_size >= batch_size: + send_batch_multipart(buffer) + buffer.clear() # Clear the buffer after sending + buffer_size = 0 # Reset buffer size + + try: + image.set_snap(snapshot_name) + image.diff_iterate( + 0, size, incremental_parent, diff_cb_count, whole_object=True + ) + block_total_mb += total_chunks * 4 + image.diff_iterate( + 0, size, incremental_parent, diff_cb_send, whole_object=True + ) + + if buffer: + send_batch_multipart(buffer) + buffer.clear() # Clear the buffer after sending + buffer_size = 0 # Reset buffer size + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + else: + + def full_chunker(): + nonlocal block_total_mb + chunk_size = 1024 * 1024 * chunk_size_mb + current_chunk = 0 + last_chunk_time = time.time() + while current_chunk < size: + chunk = image.read(current_chunk, chunk_size) + yield chunk + current_chunk += chunk_size + block_total_mb += len(chunk) / 1024 / 1024 + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(chunk_size_mb / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, + ) + + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "size": size, + "source_snapshot": incremental_parent, + } + + try: + response = session.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + headers={"Content-Type": "application/octet-stream"}, + params=send_params, + data=full_chunker(), + ) + response.raise_for_status() + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + } + try: + response = session.patch( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + params=send_params, + ) + response.raise_for_status() + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + + block_t_end = time.time() + block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1) + + current_stage += 1 + return finish( + celery, + f"Successfully created mirror of VM '{domain}' (snapshot '{snapshot_name}') on remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)", + current=current_stage, + total=total_stages, + ) + + +def vm_worker_promote_mirror( + zkhandler, + celery, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl, + destination_storage_pool, + remove_on_source=False, +): + now = datetime.now() + snapshot_name = now.strftime("%Y%m%d%H%M%S") + + current_stage = 0 + total_stages = 1 + start( + celery, + f"Creating mirror of VM '{domain}' to cluster '{destination_api_uri}'", + current=current_stage, + total=total_stages, + ) + + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return False + + current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid)) + if current_snapshots and snapshot_name in current_snapshots: + # This should never actually happen since snapshots with mirror are dated, but worth + # checking just in case someone tries to be sneaky + fail( + celery, + f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!", + ) + return False + + # Get our side's VM configuration details + try: + vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + except KeyError: + vm_detail = None + + if not isinstance(vm_detail, dict): + fail( + celery, + f"VM listing returned invalid data: {vm_detail}", + ) + return False + + vm_name = vm_detail["name"] + + # Validate that the destination cluster can be reached + destination_api_timeout = (3.05, 172800) + destination_api_headers = { + "X-Api-Key": destination_api_key, + } + + session = requests.Session() + session.headers.update(destination_api_headers) + session.verify = destination_api_verify_ssl + session.timeout = destination_api_timeout + + try: + # Hit the API root; this should return "PVC API version x" + response = session.get( + f"{destination_api_uri}/", + timeout=destination_api_timeout, + params=None, + data=None, + ) + if "PVC API" not in response.json().get("message"): + raise ValueError("Remote API is not a PVC API or incorrect URI given") + except requests.exceptions.ConnectionError as e: + fail( + celery, + f"Connection to remote API timed out: {e}", + ) + return False + except ValueError as e: + fail( + celery, + f"Connection to remote API is not valid: {e}", + ) + return False + except Exception as e: + fail( + celery, + f"Connection to remote API failed: {e}", + ) + return False + + # Hit the API "/status" endpoint to validate API key and cluster status + response = session.get( + f"{destination_api_uri}/status", + params=None, + data=None, + ) + destination_cluster_status = response.json() + current_destination_pvc_version = destination_cluster_status.get( + "pvc_version", None + ) + if current_destination_pvc_version is None: + fail( + celery, + "Connection to remote API failed: no PVC version information returned", + ) + return False + + expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed + # Work around development versions + current_destination_pvc_version = re.sub( + r"~git-.*", "", current_destination_pvc_version + ) + # Compare versions + if parse_version(current_destination_pvc_version) < parse_version( + expected_destination_pvc_version + ): + fail( + celery, + f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher", + ) + return False + + # Check if the VM already exists on the remote + response = session.get( + f"{destination_api_uri}/vm/{domain}", + params=None, + data=None, + ) + destination_vm_detail = response.json() + if type(destination_vm_detail) is list and len(destination_vm_detail) > 0: + destination_vm_detail = destination_vm_detail[0] + else: + destination_vm_detail = {} + + current_destination_vm_state = destination_vm_detail.get("state", None) + if ( + current_destination_vm_state is not None + and current_destination_vm_state != "mirror" + ): + fail( + celery, + "Remote PVC VM exists and is not a mirror", + ) + return False + + # Get the list of all RBD volumes + rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") + + # VM shutdown stages + total_stages += 1 + # Snapshot creation stages + total_stages += 1 + len(rbd_list) + # Snapshot sending stages + total_stages = 2 + (3 * len(rbd_list)) + # Cleanup stages + total_stages += 2 + + # + # 1. Shut down VM + # + + current_stage += 1 + update( + celery, + f"Shutting down VM '{vm_name}'", + current=current_stage, + total=total_stages, + ) + + retcode, retmsg = shutdown_vm(zkhandler, domain, wait=True) + if not retcode: + fail( + celery, + "Failed to shut down VM", + ) + return False + + # + # 2. Create snapshot + # + + snap_list = list() + + # If a snapshot fails, clean up any snapshots that were successfuly created + def cleanup_failure(): + for snapshot in snap_list: + rbd, snapshot_name = snapshot.split("@") + pool, volume = rbd.split("/") + # We capture no output here, because if this fails too we're in a deep + # error chain and will just ignore it + ceph.remove_snapshot(zkhandler, pool, volume, snapshot_name) + + # Iterrate through and create a snapshot for each RBD volume + for rbd in rbd_list: + current_stage += 1 + update( + celery, + f"Creating RBD snapshot of {rbd}", + current=current_stage, + total=total_stages, + ) + + pool, volume = rbd.split("/") + ret, msg = ceph.add_snapshot( + zkhandler, pool, volume, snapshot_name, zk_only=False + ) + if not ret: + cleanup_failure() + fail( + celery, + msg.replace("ERROR: ", ""), + ) + return False + else: + snap_list.append(f"{pool}/{volume}@{snapshot_name}") + + current_stage += 1 + update( + celery, + "Creating VM configuration snapshot", + current=current_stage, + total=total_stages, + ) + + # Get the current timestamp + tstart = time.time() + # Get the current domain XML + vm_config = zkhandler.read(("domain.xml", dom_uuid)) + + # Add the snapshot entry to Zookeeper + zkhandler.write( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ), + snapshot_name, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ), + tstart, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ), + vm_config, + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ), + ",".join(snap_list), + ), + ] + ) + + # + # 2. Send snapshot to remote + # + + # Determine if there's a valid shared snapshot to send an incremental diff from + local_snapshots = {s["name"] for s in vm_detail["snapshots"]} + remote_snapshots = {s["name"] for s in destination_vm_detail["snapshots"]} + incremental_parent = next( + (s for s in local_snapshots if s in remote_snapshots), None + ) + + current_stage += 1 + update( + celery, + f"Sending VM configuration for {vm_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + send_params = { + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + try: + response = session.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", + headers={"Content-Type": "application/json"}, + params=send_params, + json=vm_detail, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send config: {e}", + ) + return False + + # Create the block devices on the remote side if this is a new VM send + block_t_start = time.time() + block_total_mb = 0 + + for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]: + rbd_name = rbd_detail["name"] + pool, volume = rbd_name.split("/") + + current_stage += 1 + update( + celery, + f"Preparing remote volume for {rbd_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + # Get the storage volume details + retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False) + if not retcode or len(retdata) != 1: + if len(retdata) < 1: + error_message = f"No detail returned for volume {rbd_name}" + elif len(retdata) > 1: + error_message = f"Multiple details returned for volume {rbd_name}" + else: + error_message = f"Error getting details for volume {rbd_name}" + fail( + celery, + error_message, + ) + return False + + try: + local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) + except Exception as e: + error_message = f"Failed to get volume size for {rbd_name}: {e}" + + if destination_storage_pool is not None: + pool = destination_storage_pool + + current_stage += 1 + update( + celery, + f"Checking remote volume {rbd_name} for compliance", + current=current_stage, + total=total_stages, + ) + + # Check if the volume exists on the target + response = session.get( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + params=None, + data=None, + ) + if response.status_code != 404 and current_destination_vm_state is None: + fail( + celery, + f"Remote storage pool {pool} already contains volume {volume}", + ) + return False + + if current_destination_vm_state is not None: + try: + remote_volume_size = ceph.format_bytes_fromhuman( + response.json()[0]["stats"]["size"] + ) + except Exception as e: + error_message = f"Failed to get volume size for remote {rbd_name}: {e}" + fail(celery, error_message) + return False + + if local_volume_size != remote_volume_size: + response = session.put( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + params={"new_size": local_volume_size, "force": True}, + ) + if response.status_code != 200: + fail( + celery, + "Failed to resize remote volume to match local volume", + ) + return False + + # Send the volume to the remote + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") + cluster.connect() + ioctx = cluster.open_ioctx(pool) + image = RBDImage(ioctx, name=volume, snapshot=snapshot_name, read_only=True) + size = image.size() + chunk_size_mb = 1024 + + if incremental_parent is not None: + # Diff between incremental_parent and snapshot + celery_message = ( + f"Sending diff of {rbd_name}@{incremental_parent} → {snapshot_name}" + ) + else: + # Full image transfer + celery_message = f"Sending full image of {rbd_name}@{snapshot_name}" + + current_stage += 1 + update( + celery, + celery_message, + current=current_stage, + total=total_stages, + ) + + if incremental_parent is not None: + # Createa single session to reuse connections + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + + session.params.update(send_params) + + # Send 32 objects (128MB) at once + send_max_objects = 32 + batch_size_mb = 4 * send_max_objects + batch_size = batch_size_mb * 1024 * 1024 + + total_chunks = 0 + + def diff_cb_count(offset, length, exists): + nonlocal total_chunks + if exists: + total_chunks += 1 + + current_chunk = 0 + buffer = list() + buffer_size = 0 + last_chunk_time = time.time() + + def send_batch_multipart(buffer): + nonlocal last_chunk_time + files = {} + for i in range(len(buffer)): + files[f"object_{i}"] = ( + f"object_{i}", + buffer[i], + "application/octet-stream", + ) + try: + response = session.put( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + files=files, + stream=True, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send diff batch ({e}): {response.json()['message']}", + ) + return False + + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(batch_size_mb / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, + ) + + def add_block_to_multipart(buffer, offset, length, data): + part_data = ( + offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data + ) # Add header and data + buffer.append(part_data) + + def diff_cb_send(offset, length, exists): + nonlocal current_chunk, buffer, buffer_size + if exists: + # Read the data for the current block + data = image.read(offset, length) + # Add the block to the multipart buffer + add_block_to_multipart(buffer, offset, length, data) + current_chunk += 1 + buffer_size += len(data) + if buffer_size >= batch_size: + send_batch_multipart(buffer) + buffer.clear() # Clear the buffer after sending + buffer_size = 0 # Reset buffer size + + try: + image.set_snap(snapshot_name) + image.diff_iterate( + 0, size, incremental_parent, diff_cb_count, whole_object=True + ) + block_total_mb += total_chunks * 4 + image.diff_iterate( + 0, size, incremental_parent, diff_cb_send, whole_object=True + ) + + if buffer: + send_batch_multipart(buffer) + buffer.clear() # Clear the buffer after sending + buffer_size = 0 # Reset buffer size + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + else: + + def full_chunker(): + nonlocal block_total_mb + chunk_size = 1024 * 1024 * chunk_size_mb + current_chunk = 0 + last_chunk_time = time.time() + while current_chunk < size: + chunk = image.read(current_chunk, chunk_size) + yield chunk + current_chunk += chunk_size + block_total_mb += len(chunk) / 1024 / 1024 + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(chunk_size_mb / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, + ) + + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "size": size, + "source_snapshot": incremental_parent, + } + + try: + response = session.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + headers={"Content-Type": "application/octet-stream"}, + params=send_params, + data=full_chunker(), + ) + response.raise_for_status() + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + } + try: + response = session.patch( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + params=send_params, + ) + response.raise_for_status() + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + + block_t_end = time.time() + block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1) + + # + # 4. Start VM on remote + # + + current_stage += 1 + update( + celery, + f"Starting VM '{vm_name}' on remote cluster", + current=current_stage, + total=total_stages, + ) + + try: + response = session.post( + f"{destination_api_uri}/vm/{vm_name}/state", + headers={"Content-Type": "application/octet-stream"}, + params={"state": "start", "wait": True}, + data=full_chunker(), + ) + response.raise_for_status() + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + + # 5. Set mirror state OR remove VM + if remove_on_source: + current_stage += 1 + update( + celery, + f"Removing VM '{vm_name}' from local cluster", + current=current_stage, + total=total_stages, + ) + + retcode, retmsg = remove_vm(zkhandler, domain) + else: + current_stage += 1 + update( + celery, + f"Setting VM '{vm_name}' state to mirror on local cluster", + current=current_stage, + total=total_stages, + ) + + change_state(zkhandler, dom_uuid, "mirror") + + current_stage += 1 + return finish( + celery, + f"Successfully promoted VM '{domain}' (snapshot '{snapshot_name}') on remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)", + current=current_stage, + total=total_stages, + ) diff --git a/worker-daemon/pvcworkerd/Daemon.py b/worker-daemon/pvcworkerd/Daemon.py index 20fa6df4..d8a1564d 100755 --- a/worker-daemon/pvcworkerd/Daemon.py +++ b/worker-daemon/pvcworkerd/Daemon.py @@ -34,6 +34,8 @@ from daemon_lib.vm import ( vm_worker_export_snapshot, vm_worker_import_snapshot, vm_worker_send_snapshot, + vm_worker_create_mirror, + vm_worker_promote_mirror, ) from daemon_lib.ceph import ( osd_worker_add_osd, @@ -276,6 +278,90 @@ def vm_send_snapshot( ) +@celery.task(name="vm.create_mirror", bind=True, routing_key="run_on") +def vm_create_mirror( + self, + domain=None, + destination_api_uri="", + destination_api_key="", + destination_api_verify_ssl=True, + destination_storage_pool=None, + run_on="primary", +): + @ZKConnection(config) + def run_vm_create_mirror( + zkhandler, + self, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + destination_storage_pool=None, + ): + return vm_worker_create_mirror( + zkhandler, + self, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + destination_storage_pool=destination_storage_pool, + ) + + return run_vm_create_mirror( + self, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + destination_storage_pool=destination_storage_pool, + ) + + +@celery.task(name="vm.promote_mirror", bind=True, routing_key="run_on") +def vm_promote_mirror( + self, + domain=None, + destination_api_uri="", + destination_api_key="", + destination_api_verify_ssl=True, + destination_storage_pool=None, + remove_on_source=False, + run_on="primary", +): + @ZKConnection(config) + def run_vm_promote_mirror( + zkhandler, + self, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + destination_storage_pool=None, + remove_on_source=False, + ): + return vm_worker_promote_mirror( + zkhandler, + self, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + destination_storage_pool=destination_storage_pool, + remove_on_source=remove_on_source, + ) + + return run_vm_promote_mirror( + self, + domain, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + destination_storage_pool=destination_storage_pool, + remove_on_source=remove_on_source, + ) + + @celery.task(name="osd.add", bind=True, routing_key="run_on") def osd_add( self,