Add Ceph block receive (initial)

This commit is contained in:
Joshua Boniface 2024-09-11 00:51:07 -04:00
parent 7fe1262887
commit b1c4b2e928
2 changed files with 143 additions and 0 deletions

View File

@ -3572,6 +3572,97 @@ class API_VM_Snapshot_Import(Resource):
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
# /vm/<vm>/snapshot/receive/block
class API_VM_Snapshot_Receive_Block(Resource):
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
{
"name": "size",
"required": True,
},
{
"name": "source_snapshot",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: size
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
- in: query
name: source_snapshot
type: string
required: false
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block(
reqargs.get("pool"),
reqargs.get("volume") + "_recv",
reqargs.get("snapshot"),
int(reqargs.get("size")),
flask.request.stream,
source_snapshot=reqargs.get("source_snapshot"),
)
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
# /vm/autobackup
class API_VM_Autobackup_Root(Resource):
@RequestParser(

View File

@ -1294,6 +1294,58 @@ def vm_flush_locks(zkhandler, vm):
return output, retcode
def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snapshot=None):
try:
import rados
import rbd
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
ioctx = cluster.open_ioctx(pool)
if not source_snapshot:
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, volume, size)
image = rbd.Image(ioctx, volume)
last_chunk = 0
chunk_size = 1024 * 1024 * 128
if source_snapshot:
# Receiving diff data
print(f"Applying diff between {source_snapshot} and {snapshot}")
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
# Extract the offset and length (8 bytes each) and the data
offset = int.from_bytes(chunk[:8], 'big')
length = int.from_bytes(chunk[8:16], 'big')
data = chunk[16:16 + length]
image.write(data, offset)
image.create_snap(snapshot)
else:
# Receiving full image
print(f"Importing full snapshot {snapshot}")
while True:
chunk = flask.request.stream.read(chunk_size)
if not chunk:
break
image.write(chunk, last_chunk)
last_chunk += len(chunk)
image.create_snap(snapshot)
image.close()
ioctx.close()
cluster.shutdown()
except Exception as e:
return {"message": f"Failed to import block device: {e}"}, 400
#
# Network functions
#