Finish working implementation of send/receive

Required some significant refactoring due to issues with the diff send,
but it works.
This commit is contained in:
Joshua Boniface 2024-09-30 02:31:06 -04:00
parent 34f0a2f388
commit 7785166a7e
3 changed files with 378 additions and 154 deletions

View File

@ -3717,16 +3717,12 @@ class API_VM_Snapshot_Receive_Block(Resource):
"name": "size",
"required": True,
},
{
"name": "source_snapshot",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
Receive a full snapshot of a single RBD volume from another PVC cluster
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
@ -3753,11 +3749,6 @@ class API_VM_Snapshot_Receive_Block(Resource):
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
- in: query
name: source_snapshot
type: string
required: false
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
responses:
200:
description: OK
@ -3775,13 +3766,151 @@ class API_VM_Snapshot_Receive_Block(Resource):
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block(
return api_helper.vm_snapshot_receive_block_full(
reqargs.get("pool"),
reqargs.get("volume"),
reqargs.get("snapshot"),
int(reqargs.get("size")),
flask.request.stream,
source_snapshot=reqargs.get("source_snapshot"),
)
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
{
"name": "source_snapshot",
"required": True,
},
]
)
@Authenticator
def put(self, vm, reqargs):
"""
Receive a single diff element from a snapshot of a single RBD volume from another PVC cluster
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: source_snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot parent
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block_diff(
reqargs.get("pool"),
reqargs.get("volume"),
reqargs.get("snapshot"),
reqargs.get("source_snapshot"),
flask.request.stream,
)
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
]
)
@Authenticator
def patch(self, vm, reqargs):
"""
Create the block snapshot at snapshot of volume
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block_createsnap(
reqargs.get("pool"),
reqargs.get("volume"),
reqargs.get("snapshot"),
)

View File

@ -1306,13 +1306,10 @@ def vm_flush_locks(zkhandler, vm):
@ZKConnection(config)
def vm_snapshot_receive_block(
zkhandler, pool, volume, snapshot, size, stream, source_snapshot=None
):
def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, stream):
"""
Receive an RBD volume from a remote system
"""
try:
import rados
import rbd
@ -1328,7 +1325,7 @@ def vm_snapshot_receive_block(
cluster.connect()
ioctx = cluster.open_ioctx(pool)
if not source_snapshot and not volume_exists:
if not volume_exists:
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, volume, size)
retflag, retdata = pvc_ceph.add_volume(
@ -1349,25 +1346,8 @@ def vm_snapshot_receive_block(
image = rbd.Image(ioctx, volume)
last_chunk = 0
chunk_size = 1024 * 1024 * 128
chunk_size = 1024 * 1024 * 64
if source_snapshot:
# Receiving diff data
logger.info(
f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}"
)
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
# Extract the offset and length (8 bytes each) and the data
offset = int.from_bytes(chunk[:8], "big")
length = int.from_bytes(chunk[8:16], "big")
data = chunk[16 : 16 + length]
image.write(data, offset)
else:
# Receiving full image
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
while True:
chunk = flask.request.stream.read(chunk_size)
@ -1376,14 +1356,69 @@ def vm_snapshot_receive_block(
image.write(chunk, last_chunk)
last_chunk += len(chunk)
image.close()
ioctx.close()
cluster.shutdown()
@ZKConnection(config)
def vm_snapshot_receive_block_diff(
zkhandler, pool, volume, snapshot, source_snapshot, stream
):
"""
Receive an RBD volume from a remote system
"""
import rados
import rbd
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, volume)
logger.info(
f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}"
)
chunk = stream.read()
print(type(chunk))
print(len(chunk))
# Extract the offset and length (8 bytes each) and the data
offset = int.from_bytes(chunk[:8], "big")
length = int.from_bytes(chunk[8:16], "big")
data = chunk[16 : 16 + length]
print(f"Writing {length} bytes to {offset}")
written = image.write(data, offset)
print(f"Wrote {written} bytes")
image.close()
ioctx.close()
cluster.shutdown()
@ZKConnection(config)
def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
"""
Create the snapshot of a remote volume
"""
import rados
import rbd
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, volume)
image.create_snap(snapshot)
image.close()
ioctx.close()
cluster.shutdown()
retflag, retdata = pvc_ceph.add_snapshot(
zkhandler, pool, volume, snapshot, zk_only=True
)
if not retflag:
image.close()
ioctx.close()
cluster.shutdown()
if retflag:
retcode = 200
@ -1393,12 +1428,6 @@ def vm_snapshot_receive_block(
output = {"message": retdata.replace('"', "'")}
return output, retcode
image.close()
ioctx.close()
cluster.shutdown()
except Exception as e:
return {"message": f"Failed to import block device: {e}"}, 400
@ZKConnection(config)
def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=None):

View File

@ -3269,7 +3269,7 @@ def vm_worker_send_snapshot(
verify=destination_api_verify_ssl,
)
destination_vm_status = response.json()
if len(destination_vm_status) > 0:
if type(destination_vm_status) is list and len(destination_vm_status) > 0:
destination_vm_status = destination_vm_status[0]
else:
destination_vm_status = {}
@ -3358,10 +3358,43 @@ def vm_worker_send_snapshot(
# Begin send, set stages
total_stages = (
2
+ (3 * len(snapshot_rbdsnaps))
+ (2 * len(snapshot_rbdsnaps))
+ (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
)
current_stage += 1
update(
celery,
f"Sending VM configuration for {vm_name}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
send_params = {
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
send_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/json",
}
try:
response = requests.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
json=vm_detail,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {e}",
)
return False
# Create the block devices on the remote side if this is a new VM send
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
rbd_name = rbd_detail["name"]
@ -3429,30 +3462,85 @@ def vm_worker_send_snapshot(
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
size = image.size()
chunk_size_mb = 128
chunk_size_mb = 64
if incremental_parent is not None:
# Diff between incremental_parent and snapshot
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd_name}"
def diff_chunker():
def diff_cb(offset, length, exists):
"""Callback to handle diff regions"""
if exists:
data = image.read(offset, length)
yield (
offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
celery_message = (
f"Sending diff {incremental_parent}>{snapshot_name} for {rbd_name}"
)
image.set_snap(incremental_parent)
image.diff_iterate(0, size, incremental_parent, diff_cb)
data_stream = diff_chunker()
else:
# Full image transfer
celery_message = f"Sending full image of {rbd_name}@{snapshot_name}"
def chunker():
current_stage += 1
update(
celery,
celery_message,
current=current_stage,
total=total_stages,
)
send_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/octet-stream",
"Transfer-Encoding": None, # Disable chunked transfer encoding
}
if incremental_parent is not None:
send_params = {
"pool": pool,
"volume": volume,
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
last_chunk_time = time.time()
def diff_cb_send(offset, length, exists):
nonlocal last_chunk_time
if exists:
data = image.read(offset, length)
block = offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
response = requests.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
data=block,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
current_chunk_time = time.time()
chunk_time = current_chunk_time - last_chunk_time
last_chunk_time = current_chunk_time
chunk_speed = round(4 / chunk_time, 1)
update(
celery,
celery_message + f" ({chunk_speed} MB/s)",
current=current_stage,
total=total_stages,
)
try:
image.set_snap(snapshot_name)
image.diff_iterate(
0, size, incremental_parent, diff_cb_send, whole_object=True
)
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
else:
def full_chunker():
chunk_size = 1024 * 1024 * chunk_size_mb
current_chunk = 0
last_chunk_time = time.time()
@ -3471,16 +3559,6 @@ def vm_worker_send_snapshot(
total=total_stages,
)
data_stream = chunker()
current_stage += 1
update(
celery,
celery_message,
current=current_stage,
total=total_stages,
)
send_params = {
"pool": pool,
"volume": volume,
@ -3488,18 +3566,39 @@ def vm_worker_send_snapshot(
"size": size,
"source_snapshot": incremental_parent,
}
send_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/octet-stream",
"Transfer-Encoding": None, # Disable chunked transfer encoding
}
try:
response = requests.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
data=data_stream,
data=full_chunker(),
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
send_params = {
"pool": pool,
"volume": volume,
"snapshot": snapshot_name,
}
try:
response = requests.patch(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
@ -3514,39 +3613,6 @@ def vm_worker_send_snapshot(
ioctx.close()
cluster.shutdown()
current_stage += 1
update(
celery,
f"Sending VM configuration for {vm_name}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
send_params = {
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
send_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/json",
}
try:
response = requests.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
json=vm_detail,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {e}",
)
return False
current_stage += 1
return finish(
celery,