Add VM snapshot send (initial)

This commit is contained in:
2024-09-13 11:52:08 -04:00
parent 5c3ce358df
commit fe1740ecd9
3 changed files with 544 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import re
import os.path
import lxml.objectify
import lxml.etree
import subprocess
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
@ -3119,3 +3120,389 @@ def vm_worker_import_snapshot(
current=current_stage,
total=total_stages,
)
def vm_worker_send_snapshot(
zkhandler,
celery,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
):
import requests
from packaging.version import parse as parse_version
current_stage = 0
total_stages = 1
start(
celery,
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
current=current_stage,
total=total_stages,
)
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
# Get our side's VM configuration details
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
if not isinstance(vm_detail, dict):
fail(celery, f"VM listing returned invalid data: {vm_detail}",)
return False
# Check if the snapshot exists
if not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Check if the incremental parent exists
if incremental_parent is not None and not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Validate that the destination cluster can be reached
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/octet-stream",
}
try:
# Hit the API root; this should return "PVC API version x"
response = requests.get(
f"{destination_api_uri}/",
timeout=destination_api_timeout,
headers=None,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
if "PVC API" not in response.json().get("message"):
raise ValueError("Remote API is not a PVC API or incorrect URI given")
except requests.exceptions.ConnectionError as e:
fail(
celery,
f"Connection to remote API timed out: {e}",
)
return False
except ValueError as e:
fail(
celery,
f"Connection to remote API is not valid: {e}",
)
return False
except Exception as e:
fail(
celery,
f"Connection to remote API failed: {e}",
)
return False
# Hit the API "/status" endpoint to validate API key and cluster status
response = requests.get(
f"{destination_api_uri}/status",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
destination_cluster_status = response.json()
current_destination_pvc_version = destination_cluster_status.get(
"pvc_version", None
)
if current_destination_pvc_version is None:
fail(
celery,
"Connection to remote API failed: no PVC version information returned",
)
return False
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
if parse_version(current_destination_pvc_version) < parse_version(
expected_destination_pvc_version
):
fail(
celery,
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
)
return False
# Check if the VM already exists on the remote
response = requests.get(
f"{destination_api_uri}/vm/{domain}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
destination_vm_status = response.json()
current_destination_vm_state = destination_vm_status.get(
"state", None
)
if current_destination_vm_state is not None and current_destination_vm_state != "mirror":
fail(
celery,
f"Remote PVC VM exists and is not a mirror",
)
return False
# Get details about VM snapshot
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
[
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.name",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.timestamp",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.xml",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.rbd_snapshots",
snapshot_name,
)
),
]
)
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
# Get details about remote VM snapshots
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
latest_destination_vm_snapshot = destination_vm_snapshots[0]
# Check if this snapshot is in the remote list already
if snapshot_name in [s['name'] for s in destination_vm_snapshots]:
fail(
celery,
f"Snapshot {snapshot_name} already exists on the target",
)
return False
# Check if this snapshot is older than the latest remote VM snapshot
if snapshot_timestamp < latest_destination_vm_snapshot["timestamp"]:
fail(
celery,
f"Target has a newer snapshot ({latest_destination_vm_snapshot['name']}); cannot send old snapshot {snapshot_name}",
)
return False
# Check that our incremental parent exists on the remote VM
if incremental_parent is not None:
if increment_parent not in [s['name'] for s in destination_vm_snapshots]:
fail(
celery,
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
)
return False
# Set send type
send_type = "incremental" if incremental_parent is not None else "full"
# Begin send, set stages
total_stages = 2 + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
# Create the block devices on the remote side if this is a new VM send
for rbd in [r for r in vm_detail["disks"] if r['type'] == 'rbd']:
pool, volume = rbd["name"].split('/')
current_stage += 1
update(
celery,
f"Preparing remote volume for {rbd}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Get the storage volume details
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
if not retcode or len(retdata) != 1:
if len(retdata) < 1:
error_message = f"No detail returned for volume {rbd}"
elif len(retdata) > 1:
error_message = f"Multiple details returned for volume {rbd}"
else:
error_message = f"Error getting details for volume {rbd}"
fail(
celery,
error_message,
)
return False
try:
size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
except Exception as e:
error_message = f"Failed to get volume size for {rbd}: {e}"
if destination_storage_pool is not None:
pool = destination_storage_pool
if current_destination_vm_state is None:
current_stage += 1
update(
celery,
f"Creating remote volume {pool}/{volume} for {rbd}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Check if the volume exists on the target
response = requests.get(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
if response.status_code != 404:
fail(
celery,
f"Remote storage pool {pool} already contains volume {volume}",
)
return False
# Create the volume on the target
params = {
"size": size_bytes,
}
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=params,
data=None,
verify=destination_api_verify_ssl,
)
destination_volume_create_status = response.json()
if response.status_code != 200:
fail(
celery,
f"Failed to create volume {pool}/{volume} on target: {destination_volume_create_status['message']}",
)
return False
# Send the volume to the remote
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, name=volume, snapshot=snapshot, read_only=True)
size = image.size()
chunk_size_mb = 128
if incremental_parent is not None:
# Diff between incremental_parent and snapshot
def diff_chunker():
def diff_cb(offset, length, exists):
""" Callback to handle diff regions """
if exists:
data = image.read(offset, length)
yield (offset.to_bytes(8, 'big') + length.to_bytes(8, 'big') + data)
image.set_snap(incremental_parent)
image.diff_iterate(0, size, source_snapshot, diff_cb)
data_stream = diff_chunker()
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd}")
else:
# Full image transfer
def chunker():
d_start = time.time()
chunk_size = 1024 * 1024 * chunk_size_mb
current_chunk = 0
while current_chunk < size:
t_end = time.time()
t_tot = t_end - t_start
chunk = image.read(current_chunk, chunk_size)
yield chunk
current_chunk += chunk_size
data_stream = chunker()
celery_message = f"Sending full image of {rbd}@{snapshot_name}"
current_stage += 1
update(
celery,
f"Sending volume {rbd}@{snapshot_name} to target ({send_type})",
current=current_stage,
total=total_stages,
)
send_params = {
'pool': pool,
'volume': volume,
'snapshot': snapshot_name,
'size': size,
'source_snapshot': incremental_parent,
}
send_headers = {
'X-Api-Key': destination_api_key,,
'Content-Type': 'application/octet-stream',
'Transfer-Encoding': None # Disable chunked transfer encoding
}
try:
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
data=data_stream,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send snapshot: {e}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
# Send the VM configuration
if current_destination_vm_state is None:
# This is a new VM, so define it
#response = requests.post()
else:
# This is a modification
#response = requests.post()