Add VM snapshot send (initial)
This commit is contained in:
		| @@ -3119,3 +3119,418 @@ def vm_worker_import_snapshot( | ||||
|         current=current_stage, | ||||
|         total=total_stages, | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def vm_worker_send_snapshot( | ||||
|     zkhandler, | ||||
|     celery, | ||||
|     domain, | ||||
|     snapshot_name, | ||||
|     destination_api_uri, | ||||
|     destination_api_key, | ||||
|     destination_api_verify_ssl=True, | ||||
|     incremental_parent=None, | ||||
|     destination_storage_pool=None, | ||||
| ): | ||||
|     import requests | ||||
|     import rados | ||||
|     import rbd | ||||
|     from packaging.version import parse as parse_version | ||||
|  | ||||
|     current_stage = 0 | ||||
|     total_stages = 1 | ||||
|     start( | ||||
|         celery, | ||||
|         f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'", | ||||
|         current=current_stage, | ||||
|         total=total_stages, | ||||
|     ) | ||||
|  | ||||
|     # Validate that VM exists in cluster | ||||
|     dom_uuid = getDomainUUID(zkhandler, domain) | ||||
|     if not dom_uuid: | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Could not find VM '{domain}' in the cluster", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Get our side's VM configuration details | ||||
|     vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0] | ||||
|     if not isinstance(vm_detail, dict): | ||||
|         fail( | ||||
|             celery, | ||||
|             f"VM listing returned invalid data: {vm_detail}", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Check if the snapshot exists | ||||
|     if not zkhandler.exists( | ||||
|         ("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name) | ||||
|     ): | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Check if the incremental parent exists | ||||
|     if incremental_parent is not None and not zkhandler.exists( | ||||
|         ("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent) | ||||
|     ): | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Could not find snapshot '{snapshot_name}' of VM '{domain}'", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Validate that the destination cluster can be reached | ||||
|     destination_api_timeout = (3.05, 172800) | ||||
|     destination_api_headers = { | ||||
|         "X-Api-Key": destination_api_key, | ||||
|         "Content-Type": "application/octet-stream", | ||||
|     } | ||||
|  | ||||
|     try: | ||||
|         # Hit the API root; this should return "PVC API version x" | ||||
|         response = requests.get( | ||||
|             f"{destination_api_uri}/", | ||||
|             timeout=destination_api_timeout, | ||||
|             headers=None, | ||||
|             params=None, | ||||
|             data=None, | ||||
|             verify=destination_api_verify_ssl, | ||||
|         ) | ||||
|         if "PVC API" not in response.json().get("message"): | ||||
|             raise ValueError("Remote API is not a PVC API or incorrect URI given") | ||||
|     except requests.exceptions.ConnectionError as e: | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Connection to remote API timed out: {e}", | ||||
|         ) | ||||
|         return False | ||||
|     except ValueError as e: | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Connection to remote API is not valid: {e}", | ||||
|         ) | ||||
|         return False | ||||
|     except Exception as e: | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Connection to remote API failed: {e}", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Hit the API "/status" endpoint to validate API key and cluster status | ||||
|     response = requests.get( | ||||
|         f"{destination_api_uri}/status", | ||||
|         timeout=destination_api_timeout, | ||||
|         headers=destination_api_headers, | ||||
|         params=None, | ||||
|         data=None, | ||||
|         verify=destination_api_verify_ssl, | ||||
|     ) | ||||
|     destination_cluster_status = response.json() | ||||
|     current_destination_pvc_version = destination_cluster_status.get( | ||||
|         "pvc_version", None | ||||
|     ) | ||||
|     if current_destination_pvc_version is None: | ||||
|         fail( | ||||
|             celery, | ||||
|             "Connection to remote API failed: no PVC version information returned", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     expected_destination_pvc_version = "0.9.100"  # TODO: 0.9.101 when completed | ||||
|     # Work around development versions | ||||
|     current_destination_pvc_version = re.sub( | ||||
|         r"~git-.*", "", current_destination_pvc_version | ||||
|     ) | ||||
|     # Compare versions | ||||
|     if parse_version(current_destination_pvc_version) < parse_version( | ||||
|         expected_destination_pvc_version | ||||
|     ): | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Check if the VM already exists on the remote | ||||
|     response = requests.get( | ||||
|         f"{destination_api_uri}/vm/{domain}", | ||||
|         timeout=destination_api_timeout, | ||||
|         headers=destination_api_headers, | ||||
|         params=None, | ||||
|         data=None, | ||||
|         verify=destination_api_verify_ssl, | ||||
|     ) | ||||
|     destination_vm_status = response.json() | ||||
|     current_destination_vm_state = destination_vm_status.get("state", None) | ||||
|     if ( | ||||
|         current_destination_vm_state is not None | ||||
|         and current_destination_vm_state != "mirror" | ||||
|     ): | ||||
|         fail( | ||||
|             celery, | ||||
|             "Remote PVC VM exists and is not a mirror", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Get details about VM snapshot | ||||
|     _, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many( | ||||
|         [ | ||||
|             ( | ||||
|                 ( | ||||
|                     "domain.snapshots", | ||||
|                     dom_uuid, | ||||
|                     "domain_snapshot.name", | ||||
|                     snapshot_name, | ||||
|                 ) | ||||
|             ), | ||||
|             ( | ||||
|                 ( | ||||
|                     "domain.snapshots", | ||||
|                     dom_uuid, | ||||
|                     "domain_snapshot.timestamp", | ||||
|                     snapshot_name, | ||||
|                 ) | ||||
|             ), | ||||
|             ( | ||||
|                 ( | ||||
|                     "domain.snapshots", | ||||
|                     dom_uuid, | ||||
|                     "domain_snapshot.xml", | ||||
|                     snapshot_name, | ||||
|                 ) | ||||
|             ), | ||||
|             ( | ||||
|                 ( | ||||
|                     "domain.snapshots", | ||||
|                     dom_uuid, | ||||
|                     "domain_snapshot.rbd_snapshots", | ||||
|                     snapshot_name, | ||||
|                 ) | ||||
|             ), | ||||
|         ] | ||||
|     ) | ||||
|     snapshot_rbdsnaps = snapshot_rbdsnaps.split(",") | ||||
|  | ||||
|     # Get details about remote VM snapshots | ||||
|     destination_vm_snapshots = destination_vm_status.get("snapshots", []) | ||||
|  | ||||
|     # Check if this snapshot is in the remote list already | ||||
|     if snapshot_name in [s["name"] for s in destination_vm_snapshots]: | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Snapshot {snapshot_name} already exists on the target", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Check if this snapshot is older than the latest remote VM snapshot | ||||
|     if ( | ||||
|         len(destination_vm_snapshots) > 0 | ||||
|         and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"] | ||||
|     ): | ||||
|         fail( | ||||
|             celery, | ||||
|             f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}", | ||||
|         ) | ||||
|         return False | ||||
|  | ||||
|     # Check that our incremental parent exists on the remote VM | ||||
|     if incremental_parent is not None: | ||||
|         if incremental_parent not in [s["name"] for s in destination_vm_snapshots]: | ||||
|             fail( | ||||
|                 celery, | ||||
|                 f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target", | ||||
|             ) | ||||
|             return False | ||||
|  | ||||
|     # Begin send, set stages | ||||
|     total_stages = ( | ||||
|         2 | ||||
|         + (2 * len(snapshot_rbdsnaps)) | ||||
|         + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0) | ||||
|     ) | ||||
|  | ||||
|     # Create the block devices on the remote side if this is a new VM send | ||||
|     for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]: | ||||
|         rbd_name = rbd_detail["name"] | ||||
|         pool, volume = rbd_name.split("/") | ||||
|  | ||||
|         current_stage += 1 | ||||
|         update( | ||||
|             celery, | ||||
|             f"Preparing remote volume for {rbd_name}@{snapshot_name}", | ||||
|             current=current_stage, | ||||
|             total=total_stages, | ||||
|         ) | ||||
|  | ||||
|         # Get the storage volume details | ||||
|         retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False) | ||||
|         if not retcode or len(retdata) != 1: | ||||
|             if len(retdata) < 1: | ||||
|                 error_message = f"No detail returned for volume {rbd_name}" | ||||
|             elif len(retdata) > 1: | ||||
|                 error_message = f"Multiple details returned for volume {rbd_name}" | ||||
|             else: | ||||
|                 error_message = f"Error getting details for volume {rbd_name}" | ||||
|             fail( | ||||
|                 celery, | ||||
|                 error_message, | ||||
|             ) | ||||
|             return False | ||||
|  | ||||
|         try: | ||||
|             size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) | ||||
|         except Exception as e: | ||||
|             error_message = f"Failed to get volume size for {rbd_name}: {e}" | ||||
|  | ||||
|         if destination_storage_pool is not None: | ||||
|             pool = destination_storage_pool | ||||
|  | ||||
|         if current_destination_vm_state is None: | ||||
|             current_stage += 1 | ||||
|             update( | ||||
|                 celery, | ||||
|                 f"Creating remote volume {pool}/{volume} for {rbd_name}@{snapshot_name}", | ||||
|                 current=current_stage, | ||||
|                 total=total_stages, | ||||
|             ) | ||||
|  | ||||
|             # Check if the volume exists on the target | ||||
|             response = requests.get( | ||||
|                 f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", | ||||
|                 timeout=destination_api_timeout, | ||||
|                 headers=destination_api_headers, | ||||
|                 params=None, | ||||
|                 data=None, | ||||
|                 verify=destination_api_verify_ssl, | ||||
|             ) | ||||
|             if response.status_code != 404: | ||||
|                 fail( | ||||
|                     celery, | ||||
|                     f"Remote storage pool {pool} already contains volume {volume}", | ||||
|                 ) | ||||
|                 return False | ||||
|  | ||||
|             # Create the volume on the target | ||||
|             params = { | ||||
|                 "size": size_bytes, | ||||
|             } | ||||
|             response = requests.post( | ||||
|                 f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", | ||||
|                 timeout=destination_api_timeout, | ||||
|                 headers=destination_api_headers, | ||||
|                 params=params, | ||||
|                 data=None, | ||||
|                 verify=destination_api_verify_ssl, | ||||
|             ) | ||||
|             destination_volume_create_status = response.json() | ||||
|             if response.status_code != 200: | ||||
|                 fail( | ||||
|                     celery, | ||||
|                     f"Failed to create volume {rbd_name} on target: {destination_volume_create_status['message']}", | ||||
|                 ) | ||||
|                 return False | ||||
|  | ||||
|         # Send the volume to the remote | ||||
|         cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") | ||||
|         cluster.connect() | ||||
|         ioctx = cluster.open_ioctx(pool) | ||||
|         image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True) | ||||
|         size = image.size() | ||||
|         chunk_size_mb = 128 | ||||
|  | ||||
|         if incremental_parent is not None: | ||||
|             # Diff between incremental_parent and snapshot | ||||
|             celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd_name}" | ||||
|  | ||||
|             def diff_chunker(): | ||||
|                 def diff_cb(offset, length, exists): | ||||
|                     """Callback to handle diff regions""" | ||||
|                     if exists: | ||||
|                         data = image.read(offset, length) | ||||
|                         yield ( | ||||
|                             offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data | ||||
|                         ) | ||||
|  | ||||
|                 image.set_snap(incremental_parent) | ||||
|                 image.diff_iterate(0, size, incremental_parent, diff_cb) | ||||
|  | ||||
|             data_stream = diff_chunker() | ||||
|         else: | ||||
|             # Full image transfer | ||||
|             celery_message = f"Sending full image of {rbd_name}@{snapshot_name}" | ||||
|  | ||||
|             def chunker(): | ||||
|                 chunk_size = 1024 * 1024 * chunk_size_mb | ||||
|                 current_chunk = 0 | ||||
|                 last_chunk_time = time.time() | ||||
|                 while current_chunk < size: | ||||
|                     chunk = image.read(current_chunk, chunk_size) | ||||
|                     yield chunk | ||||
|                     current_chunk += chunk_size | ||||
|                     current_chunk_time = time.time() | ||||
|                     chunk_time = current_chunk_time - last_chunk_time | ||||
|                     last_chunk_time = current_chunk_time | ||||
|                     chunk_speed = round(chunk_size_mb / chunk_time, 1) | ||||
|                     update( | ||||
|                         celery, | ||||
|                         celery_message + f" ({chunk_speed} MB/s)", | ||||
|                         current=current_stage, | ||||
|                         total=total_stages, | ||||
|                     ) | ||||
|  | ||||
|             data_stream = chunker() | ||||
|  | ||||
|         current_stage += 1 | ||||
|         update( | ||||
|             celery, | ||||
|             celery_message, | ||||
|             current=current_stage, | ||||
|             total=total_stages, | ||||
|         ) | ||||
|  | ||||
|         send_params = { | ||||
|             "pool": pool, | ||||
|             "volume": volume, | ||||
|             "snapshot": snapshot_name, | ||||
|             "size": size, | ||||
|             "source_snapshot": incremental_parent, | ||||
|         } | ||||
|         send_headers = { | ||||
|             "X-Api-Key": destination_api_key, | ||||
|             "Content-Type": "application/octet-stream", | ||||
|             "Transfer-Encoding": None,  # Disable chunked transfer encoding | ||||
|         } | ||||
|         try: | ||||
|             response = requests.post( | ||||
|                 f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", | ||||
|                 timeout=destination_api_timeout, | ||||
|                 headers=send_headers, | ||||
|                 params=send_params, | ||||
|                 data=data_stream, | ||||
|                 verify=destination_api_verify_ssl, | ||||
|             ) | ||||
|             response.raise_for_status() | ||||
|         except Exception as e: | ||||
|             fail( | ||||
|                 celery, | ||||
|                 f"Failed to send snapshot: {e}", | ||||
|             ) | ||||
|             return False | ||||
|         finally: | ||||
|             image.close() | ||||
|             ioctx.close() | ||||
|             cluster.shutdown() | ||||
|  | ||||
|     # Send the VM configuration | ||||
|     # if current_destination_vm_state is None: | ||||
|     # This is a new VM, so define it | ||||
|     # response = requests.post() | ||||
|     # else: | ||||
|     # This is a modification | ||||
|     # response = requests.post() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user