Add mostly complete implementation of VM send

This commit is contained in:
Joshua Boniface 2024-09-29 01:31:13 -04:00
parent 8fa37d21c0
commit 34f0a2f388
4 changed files with 373 additions and 45 deletions

View File

@ -3777,7 +3777,7 @@ class API_VM_Snapshot_Receive_Block(Resource):
"""
return api_helper.vm_snapshot_receive_block(
reqargs.get("pool"),
reqargs.get("volume") + "_recv",
reqargs.get("volume"),
reqargs.get("snapshot"),
int(reqargs.get("size")),
flask.request.stream,
@ -3788,6 +3788,82 @@ class API_VM_Snapshot_Receive_Block(Resource):
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
# /vm/<vm>/snapshot/receive/config
class API_VM_Snapshot_Receive_Config(Resource):
@RequestParser(
[
{
"name": "snapshot",
"required": True,
},
{
"name": "source_snapshot",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a snapshot of a VM configuration from another PVC cluster
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: size
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
- in: query
name: source_snapshot
type: string
required: false
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_config(
reqargs.get("snapshot"),
flask.request.get_json(),
source_snapshot=reqargs.get("source_snapshot"),
)
api.add_resource(API_VM_Snapshot_Receive_Config, "/vm/<vm>/snapshot/receive/config")
# /vm/autobackup
class API_VM_Autobackup_Root(Resource):
@RequestParser(

View File

@ -21,7 +21,9 @@
import flask
import json
import logging
import lxml.etree as etree
import sys
from re import match
from requests import get
@ -40,6 +42,15 @@ import daemon_lib.network as pvc_network
import daemon_lib.ceph as pvc_ceph
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
#
# Cluster base functions
#
@ -1294,20 +1305,46 @@ def vm_flush_locks(zkhandler, vm):
return output, retcode
@ZKConnection(config)
def vm_snapshot_receive_block(
pool, volume, snapshot, size, stream, source_snapshot=None
zkhandler, pool, volume, snapshot, size, stream, source_snapshot=None
):
"""
Receive an RBD volume from a remote system
"""
try:
import rados
import rbd
_, rbd_detail = pvc_ceph.get_list_volume(
zkhandler, pool, limit=volume, is_fuzzy=False
)
if len(rbd_detail) > 0:
volume_exists = True
else:
volume_exists = False
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
if not source_snapshot:
if not source_snapshot and not volume_exists:
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, volume, size)
retflag, retdata = pvc_ceph.add_volume(
zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True
)
if not retflag:
ioctx.close()
cluster.shutdown()
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
image = rbd.Image(ioctx, volume)
@ -1316,7 +1353,9 @@ def vm_snapshot_receive_block(
if source_snapshot:
# Receiving diff data
print(f"Applying diff between {source_snapshot} and {snapshot}")
logger.info(
f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}"
)
while True:
chunk = stream.read(chunk_size)
if not chunk:
@ -1327,11 +1366,9 @@ def vm_snapshot_receive_block(
length = int.from_bytes(chunk[8:16], "big")
data = chunk[16 : 16 + length]
image.write(data, offset)
image.create_snap(snapshot)
else:
# Receiving full image
print(f"Importing full snapshot {snapshot}")
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
while True:
chunk = flask.request.stream.read(chunk_size)
if not chunk:
@ -1340,6 +1377,21 @@ def vm_snapshot_receive_block(
last_chunk += len(chunk)
image.create_snap(snapshot)
retflag, retdata = pvc_ceph.add_snapshot(
zkhandler, pool, volume, snapshot, zk_only=True
)
if not retflag:
image.close()
ioctx.close()
cluster.shutdown()
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
image.close()
ioctx.close()
@ -1348,6 +1400,183 @@ def vm_snapshot_receive_block(
return {"message": f"Failed to import block device: {e}"}, 400
@ZKConnection(config)
def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=None):
"""
Receive a VM configuration from a remote system
This function requires some explanation.
We get a full JSON dump of the VM configuration as provided by `pvc vm info`. This contains all the information we
reasonably need to replicate the VM at the given snapshot, including metainformation.
First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists,
this is an issue and we have to error. But this should have already happened with the RBD volumes.
"""
print(vm_config)
def parse_unified_diff(diff_text, original_text):
"""
Take a unified diff and apply it to an original string
"""
# Split the original string into lines
original_lines = original_text.splitlines(keepends=True)
patched_lines = []
original_idx = 0 # Track position in original lines
diff_lines = diff_text.splitlines(keepends=True)
for line in diff_lines:
if line.startswith("---") or line.startswith("+++"):
# Ignore prefix lines
continue
if line.startswith("@@"):
# Extract line numbers from the diff hunk header
hunk_header = line
parts = hunk_header.split(" ")
original_range = parts[1]
# Get the starting line number and range length for the original file
original_start, _ = map(int, original_range[1:].split(","))
# Adjust for zero-based indexing
original_start -= 1
# Add any lines between the current index and the next hunk's start
while original_idx < original_start:
patched_lines.append(original_lines[original_idx])
original_idx += 1
elif line.startswith("-"):
# This line should be removed from the original, skip it
original_idx += 1
elif line.startswith("+"):
# This line should be added to the patched version, removing the '+'
patched_lines.append(line[1:])
else:
# Context line (unchanged), it has no prefix, add from the original
patched_lines.append(original_lines[original_idx])
original_idx += 1
# Add any remaining lines from the original file after the last hunk
patched_lines.extend(original_lines[original_idx:])
return "".join(patched_lines).strip()
# Get our XML configuration for this snapshot
# We take the main XML configuration, then apply the diff for this particular incremental
current_snapshot = [s for s in vm_config["snapshots"] if s["name"] == snapshot][0]
vm_xml = vm_config["xml"]
vm_xml_diff = "\n".join(current_snapshot["xml_diff_lines"])
snapshot_vm_xml = parse_unified_diff(vm_xml_diff, vm_xml)
if (
source_snapshot is not None
or pvc_vm.searchClusterByUUID(zkhandler, vm_config["uuid"]) is not None
):
logger.info(
f"Receiving incremental VM configuration for {vm_config['name']}@{snapshot}"
)
# Modify the VM based on our passed detail
retcode, retmsg = pvc_vm.modify_vm(
zkhandler,
vm_config["uuid"],
False,
snapshot_vm_xml,
)
retcode, retmsg = pvc_vm.modify_vm_metadata(
zkhandler,
vm_config["uuid"],
None, # Node limits are left unchanged
vm_config["node_selector"],
vm_config["node_autostart"],
vm_config["profile"],
vm_config["migration_method"],
vm_config["migration_max_downtime"],
)
current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"]))
new_vm_tags = [t["name"] for t in vm_config["tags"]]
remove_tags = []
add_tags = []
for tag in vm_config["tags"]:
if tag["name"] not in current_vm_tags:
add_tags.append((tag["name"], tag["protected"]))
for tag in current_vm_tags:
if tag not in new_vm_tags:
remove_tags.append(tag)
for tag in add_tags:
name, protected = tag
pvc_vm.modify_vm_tag(
zkhandler, vm_config["uuid"], "add", name, protected=protected
)
for tag in remove_tags:
pvc_vm.modify_vm_tag(zkhandler, vm_config["uuid"], "remove", name)
else:
logger.info(
f"Receiving full VM configuration for {vm_config['name']}@{snapshot}"
)
# Define the VM based on our passed detail
retcode, retmsg = pvc_vm.define_vm(
zkhandler,
snapshot_vm_xml,
None, # Target node is autoselected
None, # Node limits are invalid here so ignore them
vm_config["node_selector"],
vm_config["node_autostart"],
vm_config["migration_method"],
vm_config["migration_max_downtime"],
vm_config["profile"],
vm_config["tags"],
"mirror",
)
# Add this snapshot to the VM manually in Zookeeper
zkhandler.write(
[
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.name",
snapshot,
),
snapshot,
),
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.timestamp",
snapshot,
),
current_snapshot["timestamp"],
),
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.xml",
snapshot,
),
snapshot_vm_xml,
),
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.rbd_snapshots",
snapshot,
),
",".join(current_snapshot["rbd_snapshots"]),
),
]
)
#
# Network functions
#

View File

@ -1168,11 +1168,14 @@ def get_list_snapshot(zkhandler, target_pool, target_volume, limit=None, is_fuzz
continue
if target_volume and volume_name != target_volume:
continue
try:
snapshot_stats = json.loads(
zkhandler.read(
("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}")
)
)
except Exception:
snapshot_stats = []
if limit:
try:
if re.fullmatch(limit, snapshot_name):

View File

@ -3184,6 +3184,8 @@ def vm_worker_send_snapshot(
)
return False
vm_name = vm_detail["name"]
# Validate that the destination cluster can be reached
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
@ -3267,6 +3269,11 @@ def vm_worker_send_snapshot(
verify=destination_api_verify_ssl,
)
destination_vm_status = response.json()
if len(destination_vm_status) > 0:
destination_vm_status = destination_vm_status[0]
else:
destination_vm_status = {}
current_destination_vm_state = destination_vm_status.get("state", None)
if (
current_destination_vm_state is not None
@ -3351,7 +3358,7 @@ def vm_worker_send_snapshot(
# Begin send, set stages
total_stages = (
2
+ (2 * len(snapshot_rbdsnaps))
+ (3 * len(snapshot_rbdsnaps))
+ (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
)
@ -3384,7 +3391,7 @@ def vm_worker_send_snapshot(
return False
try:
size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
_ = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
except Exception as e:
error_message = f"Failed to get volume size for {rbd_name}: {e}"
@ -3395,7 +3402,7 @@ def vm_worker_send_snapshot(
current_stage += 1
update(
celery,
f"Creating remote volume {pool}/{volume} for {rbd_name}@{snapshot_name}",
f"Checking for remote volume {rbd_name}",
current=current_stage,
total=total_stages,
)
@ -3416,26 +3423,6 @@ def vm_worker_send_snapshot(
)
return False
# Create the volume on the target
params = {
"size": size_bytes,
}
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=params,
data=None,
verify=destination_api_verify_ssl,
)
destination_volume_create_status = response.json()
if response.status_code != 200:
fail(
celery,
f"Failed to create volume {rbd_name} on target: {destination_volume_create_status['message']}",
)
return False
# Send the volume to the remote
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
@ -3508,7 +3495,7 @@ def vm_worker_send_snapshot(
}
try:
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
@ -3516,10 +3503,10 @@ def vm_worker_send_snapshot(
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
except Exception:
fail(
celery,
f"Failed to send snapshot: {e}",
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
@ -3527,10 +3514,43 @@ def vm_worker_send_snapshot(
ioctx.close()
cluster.shutdown()
# Send the VM configuration
# if current_destination_vm_state is None:
# This is a new VM, so define it
# response = requests.post()
# else:
# This is a modification
# response = requests.post()
current_stage += 1
update(
celery,
f"Sending VM configuration for {vm_name}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
send_params = {
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
send_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/json",
}
try:
response = requests.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
json=vm_detail,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {e}",
)
return False
current_stage += 1
return finish(
celery,
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
current=current_stage,
total=total_stages,
)