diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 571053a7..9cc0ca50 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -3572,6 +3572,129 @@ class API_VM_Snapshot_Import(Resource): api.add_resource(API_VM_Snapshot_Import, "/vm//snapshot/import") +# /vm//snapshot/send +class API_VM_Snapshot_Send(Resource): + @RequestParser( + [ + { + "name": "snapshot_name", + "required": True, + "helptext": "A snapshot name must be specified", + }, + { + "name": "destination_api_uri", + "required": True, + "helptext": "A destination API URI must be specified", + }, + { + "name": "destination_api_key", + "required": True, + "helptext": "A destination API key must be specified", + }, + { + "name": "destination_api_verify_ssl", + "required": False, + }, + { + "name": "incremental_parent", + "required": False, + }, + { + "name": "destination_storage_pool", + "required": False, + }, + ] + ) + @Authenticator + def post(self, vm, reqargs): + """ + Send a snapshot of a VM's disks and configuration to another PVC cluster + --- + tags: + - vm + parameters: + - in: query + name: snapshot_name + type: string + required: true + description: The name of the snapshot to export (must exist) + - in: query + name: destination_api_uri + type: string + required: true + description: The base API URI of the destination PVC cluster (with prefix if applicable) + - in: query + name: destination_api_key + type: string + required: true + description: The API authentication key of the destination PVC cluster + - in: query + name: destination_api_verify_ssl + type: boolean + required: false + default: true + description: Whether or not to validate SSL certificates for an SSL-enabled destination API + - in: query + name: incremental_parent + type: string + required: false + description: A snapshot name to generate an incremental diff from; incremental send only if unset + - in: query + name: destination_storage_pool + type: string + required: false + default: source storage pool name + description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool + responses: + 202: + description: Accepted + schema: + type: string + description: The Celery job ID of the task + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + snapshot_name = reqargs.get("snapshot_name", None) + destination_api_uri = reqargs.get("destination_api_uri", None) + destination_api_key = reqargs.get("destination_api_key", None) + destination_api_verify_ssl = vool(strtobool(reqargs.get("destination_api_verify_ssl", "true"))) + incremental_parent = reqargs.get("incremental_parent", None) + destination_storage_pool = reqargs.get("destination_storage_pool", None) + + task = run_celery_task( + "vm.send_snapshot", + domain=vm, + snapshot_name=snapshot_name, + destination_api_uri=destination_api_uri, + destination_api_key=destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + incremental_parent=incremental_parent, + destination_storage_pool=destination_storage_pool, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.send_snapshot", + "run_on": f"{get_primary_node()} (primary)", + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) + + +api.add_resource(API_VM_Snapshot_Send, "/vm//snapshot/send") + + # /vm//snapshot/receive/block class API_VM_Snapshot_Receive_Block(Resource): @RequestParser( diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 88f733cf..0cc9bd4a 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -24,6 +24,7 @@ import re import os.path import lxml.objectify import lxml.etree +import subprocess from concurrent.futures import ThreadPoolExecutor from datetime import datetime @@ -3119,3 +3120,389 @@ def vm_worker_import_snapshot( current=current_stage, total=total_stages, ) + + +def vm_worker_send_snapshot( + zkhandler, + celery, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + incremental_parent=None, + destination_storage_pool=None, +): + import requests + from packaging.version import parse as parse_version + + current_stage = 0 + total_stages = 1 + start( + celery, + f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'", + current=current_stage, + total=total_stages, + ) + + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return False + + # Get our side's VM configuration details + vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + if not isinstance(vm_detail, dict): + fail(celery, f"VM listing returned invalid data: {vm_detail}",) + return False + + # Check if the snapshot exists + if not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + return False + + # Check if the incremental parent exists + if incremental_parent is not None and not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + return False + + # Validate that the destination cluster can be reached + destination_api_timeout = (3.05, 172800) + destination_api_headers = { + "X-Api-Key": destination_api_key, + "Content-Type": "application/octet-stream", + } + + try: + # Hit the API root; this should return "PVC API version x" + response = requests.get( + f"{destination_api_uri}/", + timeout=destination_api_timeout, + headers=None, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + if "PVC API" not in response.json().get("message"): + raise ValueError("Remote API is not a PVC API or incorrect URI given") + except requests.exceptions.ConnectionError as e: + fail( + celery, + f"Connection to remote API timed out: {e}", + ) + return False + except ValueError as e: + fail( + celery, + f"Connection to remote API is not valid: {e}", + ) + return False + except Exception as e: + fail( + celery, + f"Connection to remote API failed: {e}", + ) + return False + + # Hit the API "/status" endpoint to validate API key and cluster status + response = requests.get( + f"{destination_api_uri}/status", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + destination_cluster_status = response.json() + current_destination_pvc_version = destination_cluster_status.get( + "pvc_version", None + ) + if current_destination_pvc_version is None: + fail( + celery, + "Connection to remote API failed: no PVC version information returned", + ) + return False + + expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed + if parse_version(current_destination_pvc_version) < parse_version( + expected_destination_pvc_version + ): + fail( + celery, + f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher", + ) + return False + + # Check if the VM already exists on the remote + response = requests.get( + f"{destination_api_uri}/vm/{domain}", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + destination_vm_status = response.json() + current_destination_vm_state = destination_vm_status.get( + "state", None + ) + if current_destination_vm_state is not None and current_destination_vm_state != "mirror": + fail( + celery, + f"Remote PVC VM exists and is not a mirror", + ) + return False + + # Get details about VM snapshot + _, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ) + ), + ] + ) + snapshot_rbdsnaps = snapshot_rbdsnaps.split(",") + + # Get details about remote VM snapshots + destination_vm_snapshots = destination_vm_status.get("snapshots", []) + latest_destination_vm_snapshot = destination_vm_snapshots[0] + + # Check if this snapshot is in the remote list already + if snapshot_name in [s['name'] for s in destination_vm_snapshots]: + fail( + celery, + f"Snapshot {snapshot_name} already exists on the target", + ) + return False + + # Check if this snapshot is older than the latest remote VM snapshot + if snapshot_timestamp < latest_destination_vm_snapshot["timestamp"]: + fail( + celery, + f"Target has a newer snapshot ({latest_destination_vm_snapshot['name']}); cannot send old snapshot {snapshot_name}", + ) + return False + + # Check that our incremental parent exists on the remote VM + if incremental_parent is not None: + if increment_parent not in [s['name'] for s in destination_vm_snapshots]: + fail( + celery, + f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target", + ) + return False + + # Set send type + send_type = "incremental" if incremental_parent is not None else "full" + + # Begin send, set stages + total_stages = 2 + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0) + + # Create the block devices on the remote side if this is a new VM send + for rbd in [r for r in vm_detail["disks"] if r['type'] == 'rbd']: + pool, volume = rbd["name"].split('/') + + current_stage += 1 + update( + celery, + f"Preparing remote volume for {rbd}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + # Get the storage volume details + retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False) + if not retcode or len(retdata) != 1: + if len(retdata) < 1: + error_message = f"No detail returned for volume {rbd}" + elif len(retdata) > 1: + error_message = f"Multiple details returned for volume {rbd}" + else: + error_message = f"Error getting details for volume {rbd}" + fail( + celery, + error_message, + ) + return False + + try: + size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) + except Exception as e: + error_message = f"Failed to get volume size for {rbd}: {e}" + + if destination_storage_pool is not None: + pool = destination_storage_pool + + if current_destination_vm_state is None: + current_stage += 1 + update( + celery, + f"Creating remote volume {pool}/{volume} for {rbd}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + # Check if the volume exists on the target + response = requests.get( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + if response.status_code != 404: + fail( + celery, + f"Remote storage pool {pool} already contains volume {volume}", + ) + return False + + # Create the volume on the target + params = { + "size": size_bytes, + } + response = requests.post( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=params, + data=None, + verify=destination_api_verify_ssl, + ) + destination_volume_create_status = response.json() + if response.status_code != 200: + fail( + celery, + f"Failed to create volume {pool}/{volume} on target: {destination_volume_create_status['message']}", + ) + return False + + # Send the volume to the remote + cluster = rados.Rados(conffile='/etc/ceph/ceph.conf') + cluster.connect() + ioctx = cluster.open_ioctx(pool) + image = rbd.Image(ioctx, name=volume, snapshot=snapshot, read_only=True) + size = image.size() + chunk_size_mb = 128 + + if incremental_parent is not None: + # Diff between incremental_parent and snapshot + def diff_chunker(): + def diff_cb(offset, length, exists): + """ Callback to handle diff regions """ + if exists: + data = image.read(offset, length) + yield (offset.to_bytes(8, 'big') + length.to_bytes(8, 'big') + data) + + image.set_snap(incremental_parent) + image.diff_iterate(0, size, source_snapshot, diff_cb) + data_stream = diff_chunker() + celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd}") + else: + # Full image transfer + def chunker(): + d_start = time.time() + chunk_size = 1024 * 1024 * chunk_size_mb + current_chunk = 0 + while current_chunk < size: + t_end = time.time() + t_tot = t_end - t_start + chunk = image.read(current_chunk, chunk_size) + yield chunk + current_chunk += chunk_size + data_stream = chunker() + celery_message = f"Sending full image of {rbd}@{snapshot_name}" + + current_stage += 1 + update( + celery, + f"Sending volume {rbd}@{snapshot_name} to target ({send_type})", + current=current_stage, + total=total_stages, + ) + + send_params = { + 'pool': pool, + 'volume': volume, + 'snapshot': snapshot_name, + 'size': size, + 'source_snapshot': incremental_parent, + } + send_headers = { + 'X-Api-Key': destination_api_key,, + 'Content-Type': 'application/octet-stream', + 'Transfer-Encoding': None # Disable chunked transfer encoding + } + try: + response = requests.post( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + data=data_stream, + verify=destination_api_verify_ssl, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send snapshot: {e}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + + # Send the VM configuration + if current_destination_vm_state is None: + # This is a new VM, so define it + #response = requests.post() + else: + # This is a modification + #response = requests.post() diff --git a/worker-daemon/pvcworkerd/Daemon.py b/worker-daemon/pvcworkerd/Daemon.py index 173752f9..059921f3 100755 --- a/worker-daemon/pvcworkerd/Daemon.py +++ b/worker-daemon/pvcworkerd/Daemon.py @@ -33,6 +33,7 @@ from daemon_lib.vm import ( vm_worker_rollback_snapshot, vm_worker_export_snapshot, vm_worker_import_snapshot, + vm_worker_send_snapshot, ) from daemon_lib.ceph import ( osd_worker_add_osd, @@ -227,6 +228,39 @@ def vm_import_snapshot( ) +@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on") +def vm_send_snapshot( + self, + domain=None, + snapshot_name=None, + destination_api_uri="", + destination_api_key="", + destination_api_verify_ssl=True, + incremental_parent=None, + destination_storage_pool=None, + run_on="primary", +): + @ZKConnection(config) + def run_vm_send_snapshot( + zkhandler, self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=True, incremental_parent=None, destination_storage_pool=None, + ): + return vm_worker_send_snapshot( + zkhandler, + self, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + incremental_parent=incremental_parent, + destination_storage_pool=destination_storage_pool, + ) + + return run_vm_send_snapshot( + self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, incremental_parent=incremental_parent, destination_storage_pool=destination_storage_pool + ) + + @celery.task(name="osd.add", bind=True, routing_key="run_on") def osd_add( self,