diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 91a71641..0c325bd7 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -3572,6 +3572,131 @@ class API_VM_Snapshot_Import(Resource): api.add_resource(API_VM_Snapshot_Import, "/vm//snapshot/import") +# /vm//snapshot/send +class API_VM_Snapshot_Send(Resource): + @RequestParser( + [ + { + "name": "snapshot_name", + "required": True, + "helptext": "A snapshot name must be specified", + }, + { + "name": "destination_api_uri", + "required": True, + "helptext": "A destination API URI must be specified", + }, + { + "name": "destination_api_key", + "required": True, + "helptext": "A destination API key must be specified", + }, + { + "name": "destination_api_verify_ssl", + "required": False, + }, + { + "name": "incremental_parent", + "required": False, + }, + { + "name": "destination_storage_pool", + "required": False, + }, + ] + ) + @Authenticator + def post(self, vm, reqargs): + """ + Send a snapshot of a VM's disks and configuration to another PVC cluster + --- + tags: + - vm + parameters: + - in: query + name: snapshot_name + type: string + required: true + description: The name of the snapshot to export (must exist) + - in: query + name: destination_api_uri + type: string + required: true + description: The base API URI of the destination PVC cluster (with prefix if applicable) + - in: query + name: destination_api_key + type: string + required: true + description: The API authentication key of the destination PVC cluster + - in: query + name: destination_api_verify_ssl + type: boolean + required: false + default: true + description: Whether or not to validate SSL certificates for an SSL-enabled destination API + - in: query + name: incremental_parent + type: string + required: false + description: A snapshot name to generate an incremental diff from; incremental send only if unset + - in: query + name: destination_storage_pool + type: string + required: false + default: source storage pool name + description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool + responses: + 202: + description: Accepted + schema: + type: string + description: The Celery job ID of the task + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + snapshot_name = reqargs.get("snapshot_name", None) + destination_api_uri = reqargs.get("destination_api_uri", None) + destination_api_key = reqargs.get("destination_api_key", None) + destination_api_verify_ssl = bool( + strtobool(reqargs.get("destination_api_verify_ssl", "true")) + ) + incremental_parent = reqargs.get("incremental_parent", None) + destination_storage_pool = reqargs.get("destination_storage_pool", None) + + task = run_celery_task( + "vm.send_snapshot", + domain=vm, + snapshot_name=snapshot_name, + destination_api_uri=destination_api_uri, + destination_api_key=destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + incremental_parent=incremental_parent, + destination_storage_pool=destination_storage_pool, + run_on="primary", + ) + + return ( + { + "task_id": task.id, + "task_name": "vm.send_snapshot", + "run_on": f"{get_primary_node()} (primary)", + }, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) + + +api.add_resource(API_VM_Snapshot_Send, "/vm//snapshot/send") + + # /vm//snapshot/receive/block class API_VM_Snapshot_Receive_Block(Resource): @RequestParser( diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index 85842976..8c4190d7 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -1294,12 +1294,14 @@ def vm_flush_locks(zkhandler, vm): return output, retcode -def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snapshot=None): +def vm_snapshot_receive_block( + pool, volume, snapshot, size, stream, source_snapshot=None +): try: import rados import rbd - cluster = rados.Rados(conffile='/etc/ceph/ceph.conf') + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") cluster.connect() ioctx = cluster.open_ioctx(pool) @@ -1321,9 +1323,9 @@ def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snaps break # Extract the offset and length (8 bytes each) and the data - offset = int.from_bytes(chunk[:8], 'big') - length = int.from_bytes(chunk[8:16], 'big') - data = chunk[16:16 + length] + offset = int.from_bytes(chunk[:8], "big") + length = int.from_bytes(chunk[8:16], "big") + data = chunk[16 : 16 + length] image.write(data, offset) image.create_snap(snapshot) diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index da860817..5c3f303f 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -2018,6 +2018,100 @@ def cli_vm_snapshot_import( finish(retcode, retmsg) +############################################################################### +# > pvc vm snapshot send +############################################################################### +@click.command( + name="send", + short_help="Send a snapshot of a virtual machine to another PVC cluster.", +) +@connection_req +@click.argument("domain") +@click.argument("snapshot_name") +@click.argument("destination") +@click.option( + "-k", + "--destination-api-key", + "destination_api_key", + default=None, + help="The API key of the destination cluster when specifying an API URI.", +) +@click.option( + "-p", + "--destination-pool", + "destination_storage_pool", + default=None, + help="The target storage pool on the destination cluster, if it differs from the source pool.", +) +@click.option( + "-i", + "--incremental", + "incremental_parent", + default=None, + help="Perform an incremental volume send from this parent snapshot.", +) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress if waiting", +) +def cli_vm_snapshot_send( + domain, + snapshot_name, + destination, + destination_api_key, + destination_storage_pool, + incremental_parent, + wait_flag, +): + """ + Send the (existing) snapshot SNAPSHOT_NAME of virtual machine DOMAIN to the PVC cluster DESTINATION. + + DOMAIN may be a UUID or name. + + DESTINATION may either be a configured PVC connection name in this CLI instance (i.e. a valid local argument to "--connection"), or a full API URL, including port and API prefix; if using the latter, an API key can be specified with the "-k"/"--destination-api-key" option. + + The send will include the VM configuration, metainfo, and a point-in-time snapshot of all attached RBD volumes. + + By default, the storage pool of the sending cluster will be used at the destination cluster as well. If a pool of that name does not exist, specify one with the "-p"/"--detination-pool" option. + + Incremental sends are possible by specifying the "-i"/"--incremental-parent" option along with a parent snapshot name. To correctly receive, that parent snapshot must exist on DESTINATION. + """ + + connections_config = get_store(CLI_CONFIG["store_path"]) + if destination in connections_config.keys(): + destination_cluster_config = connections_config[destination] + destination_api_uri = "{}://{}:{}{}".format( + destination_cluster_config["scheme"], + destination_cluster_config["host"], + destination_cluster_config["port"], + CLI_CONFIG["api_prefix"], + ) + destination_api_key = destination_cluster_config["api_key"] + else: + destination_api_uri = destination + destination_api_key = destination_api_key + + retcode, retmsg = pvc.lib.vm.vm_send_snapshot( + CLI_CONFIG, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=CLI_CONFIG.get("verify_ssl"), + destination_storage_pool=destination_storage_pool, + incremental_parent=incremental_parent, + wait_flag=wait_flag, + ) + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) + finish(retcode, retmsg) + + ############################################################################### # > pvc vm backup ############################################################################### @@ -6588,6 +6682,7 @@ cli_vm_snapshot.add_command(cli_vm_snapshot_remove) cli_vm_snapshot.add_command(cli_vm_snapshot_rollback) cli_vm_snapshot.add_command(cli_vm_snapshot_export) cli_vm_snapshot.add_command(cli_vm_snapshot_import) +cli_vm_snapshot.add_command(cli_vm_snapshot_send) cli_vm.add_command(cli_vm_snapshot) cli_vm_backup.add_command(cli_vm_backup_create) cli_vm_backup.add_command(cli_vm_backup_restore) diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index ce1e92d0..a1de258c 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -595,6 +595,43 @@ def vm_import_snapshot( return get_wait_retdata(response, wait_flag) +def vm_send_snapshot( + config, + vm, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + destination_storage_pool=None, + incremental_parent=None, + wait_flag=True, +): + """ + Send an (existing) snapshot of a VM's disks andconfiguration to a destination PVC cluster, optionally + incremental with incremental_parent + + API endpoint: POST /vm/{vm}/snapshot/send + API arguments: snapshot_name=snapshot_name, destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, incremental_parent=incremental_parent + API schema: {"message":"{data}"} + """ + params = { + "snapshot_name": snapshot_name, + "destination_api_uri": destination_api_uri, + "destination_api_key": destination_api_key, + "destination_api_verify_ssl": destination_api_verify_ssl, + } + if destination_storage_pool is not None: + params["destination_storage_pool"] = destination_storage_pool + if incremental_parent is not None: + params["incremental_parent"] = incremental_parent + + response = call_api( + config, "post", "/vm/{vm}/snapshot/send".format(vm=vm), params=params + ) + + return get_wait_retdata(response, wait_flag) + + def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_flag=True): """ Perform a cluster VM autobackup diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 88f733cf..20d9c1bc 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -3119,3 +3119,418 @@ def vm_worker_import_snapshot( current=current_stage, total=total_stages, ) + + +def vm_worker_send_snapshot( + zkhandler, + celery, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + incremental_parent=None, + destination_storage_pool=None, +): + import requests + import rados + import rbd + from packaging.version import parse as parse_version + + current_stage = 0 + total_stages = 1 + start( + celery, + f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'", + current=current_stage, + total=total_stages, + ) + + # Validate that VM exists in cluster + dom_uuid = getDomainUUID(zkhandler, domain) + if not dom_uuid: + fail( + celery, + f"Could not find VM '{domain}' in the cluster", + ) + return False + + # Get our side's VM configuration details + vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] + if not isinstance(vm_detail, dict): + fail( + celery, + f"VM listing returned invalid data: {vm_detail}", + ) + return False + + # Check if the snapshot exists + if not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + return False + + # Check if the incremental parent exists + if incremental_parent is not None and not zkhandler.exists( + ("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent) + ): + fail( + celery, + f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", + ) + return False + + # Validate that the destination cluster can be reached + destination_api_timeout = (3.05, 172800) + destination_api_headers = { + "X-Api-Key": destination_api_key, + "Content-Type": "application/octet-stream", + } + + try: + # Hit the API root; this should return "PVC API version x" + response = requests.get( + f"{destination_api_uri}/", + timeout=destination_api_timeout, + headers=None, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + if "PVC API" not in response.json().get("message"): + raise ValueError("Remote API is not a PVC API or incorrect URI given") + except requests.exceptions.ConnectionError as e: + fail( + celery, + f"Connection to remote API timed out: {e}", + ) + return False + except ValueError as e: + fail( + celery, + f"Connection to remote API is not valid: {e}", + ) + return False + except Exception as e: + fail( + celery, + f"Connection to remote API failed: {e}", + ) + return False + + # Hit the API "/status" endpoint to validate API key and cluster status + response = requests.get( + f"{destination_api_uri}/status", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + destination_cluster_status = response.json() + current_destination_pvc_version = destination_cluster_status.get( + "pvc_version", None + ) + if current_destination_pvc_version is None: + fail( + celery, + "Connection to remote API failed: no PVC version information returned", + ) + return False + + expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed + # Work around development versions + current_destination_pvc_version = re.sub( + r"~git-.*", "", current_destination_pvc_version + ) + # Compare versions + if parse_version(current_destination_pvc_version) < parse_version( + expected_destination_pvc_version + ): + fail( + celery, + f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher", + ) + return False + + # Check if the VM already exists on the remote + response = requests.get( + f"{destination_api_uri}/vm/{domain}", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + destination_vm_status = response.json() + current_destination_vm_state = destination_vm_status.get("state", None) + if ( + current_destination_vm_state is not None + and current_destination_vm_state != "mirror" + ): + fail( + celery, + "Remote PVC VM exists and is not a mirror", + ) + return False + + # Get details about VM snapshot + _, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many( + [ + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.name", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.timestamp", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.xml", + snapshot_name, + ) + ), + ( + ( + "domain.snapshots", + dom_uuid, + "domain_snapshot.rbd_snapshots", + snapshot_name, + ) + ), + ] + ) + snapshot_rbdsnaps = snapshot_rbdsnaps.split(",") + + # Get details about remote VM snapshots + destination_vm_snapshots = destination_vm_status.get("snapshots", []) + + # Check if this snapshot is in the remote list already + if snapshot_name in [s["name"] for s in destination_vm_snapshots]: + fail( + celery, + f"Snapshot {snapshot_name} already exists on the target", + ) + return False + + # Check if this snapshot is older than the latest remote VM snapshot + if ( + len(destination_vm_snapshots) > 0 + and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"] + ): + fail( + celery, + f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}", + ) + return False + + # Check that our incremental parent exists on the remote VM + if incremental_parent is not None: + if incremental_parent not in [s["name"] for s in destination_vm_snapshots]: + fail( + celery, + f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target", + ) + return False + + # Begin send, set stages + total_stages = ( + 2 + + (2 * len(snapshot_rbdsnaps)) + + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0) + ) + + # Create the block devices on the remote side if this is a new VM send + for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]: + rbd_name = rbd_detail["name"] + pool, volume = rbd_name.split("/") + + current_stage += 1 + update( + celery, + f"Preparing remote volume for {rbd_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + # Get the storage volume details + retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False) + if not retcode or len(retdata) != 1: + if len(retdata) < 1: + error_message = f"No detail returned for volume {rbd_name}" + elif len(retdata) > 1: + error_message = f"Multiple details returned for volume {rbd_name}" + else: + error_message = f"Error getting details for volume {rbd_name}" + fail( + celery, + error_message, + ) + return False + + try: + size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) + except Exception as e: + error_message = f"Failed to get volume size for {rbd_name}: {e}" + + if destination_storage_pool is not None: + pool = destination_storage_pool + + if current_destination_vm_state is None: + current_stage += 1 + update( + celery, + f"Creating remote volume {pool}/{volume} for {rbd_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + # Check if the volume exists on the target + response = requests.get( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=None, + data=None, + verify=destination_api_verify_ssl, + ) + if response.status_code != 404: + fail( + celery, + f"Remote storage pool {pool} already contains volume {volume}", + ) + return False + + # Create the volume on the target + params = { + "size": size_bytes, + } + response = requests.post( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + timeout=destination_api_timeout, + headers=destination_api_headers, + params=params, + data=None, + verify=destination_api_verify_ssl, + ) + destination_volume_create_status = response.json() + if response.status_code != 200: + fail( + celery, + f"Failed to create volume {rbd_name} on target: {destination_volume_create_status['message']}", + ) + return False + + # Send the volume to the remote + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") + cluster.connect() + ioctx = cluster.open_ioctx(pool) + image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True) + size = image.size() + chunk_size_mb = 128 + + if incremental_parent is not None: + # Diff between incremental_parent and snapshot + celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd_name}" + + def diff_chunker(): + def diff_cb(offset, length, exists): + """Callback to handle diff regions""" + if exists: + data = image.read(offset, length) + yield ( + offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data + ) + + image.set_snap(incremental_parent) + image.diff_iterate(0, size, incremental_parent, diff_cb) + + data_stream = diff_chunker() + else: + # Full image transfer + celery_message = f"Sending full image of {rbd_name}@{snapshot_name}" + + def chunker(): + chunk_size = 1024 * 1024 * chunk_size_mb + current_chunk = 0 + last_chunk_time = time.time() + while current_chunk < size: + chunk = image.read(current_chunk, chunk_size) + yield chunk + current_chunk += chunk_size + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(chunk_size_mb / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, + ) + + data_stream = chunker() + + current_stage += 1 + update( + celery, + celery_message, + current=current_stage, + total=total_stages, + ) + + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "size": size, + "source_snapshot": incremental_parent, + } + send_headers = { + "X-Api-Key": destination_api_key, + "Content-Type": "application/octet-stream", + "Transfer-Encoding": None, # Disable chunked transfer encoding + } + try: + response = requests.post( + f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + data=data_stream, + verify=destination_api_verify_ssl, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send snapshot: {e}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + + # Send the VM configuration + # if current_destination_vm_state is None: + # This is a new VM, so define it + # response = requests.post() + # else: + # This is a modification + # response = requests.post() diff --git a/worker-daemon/pvcworkerd/Daemon.py b/worker-daemon/pvcworkerd/Daemon.py index 00e2a409..20fa6df4 100755 --- a/worker-daemon/pvcworkerd/Daemon.py +++ b/worker-daemon/pvcworkerd/Daemon.py @@ -33,6 +33,7 @@ from daemon_lib.vm import ( vm_worker_rollback_snapshot, vm_worker_export_snapshot, vm_worker_import_snapshot, + vm_worker_send_snapshot, ) from daemon_lib.ceph import ( osd_worker_add_osd, @@ -227,6 +228,54 @@ def vm_import_snapshot( ) +@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on") +def vm_send_snapshot( + self, + domain=None, + snapshot_name=None, + destination_api_uri="", + destination_api_key="", + destination_api_verify_ssl=True, + incremental_parent=None, + destination_storage_pool=None, + run_on="primary", +): + @ZKConnection(config) + def run_vm_send_snapshot( + zkhandler, + self, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=True, + incremental_parent=None, + destination_storage_pool=None, + ): + return vm_worker_send_snapshot( + zkhandler, + self, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + incremental_parent=incremental_parent, + destination_storage_pool=destination_storage_pool, + ) + + return run_vm_send_snapshot( + self, + domain, + snapshot_name, + destination_api_uri, + destination_api_key, + destination_api_verify_ssl=destination_api_verify_ssl, + incremental_parent=incremental_parent, + destination_storage_pool=destination_storage_pool, + ) + + @celery.task(name="osd.add", bind=True, routing_key="run_on") def osd_add( self,