Compare commits
6 Commits
34f0a2f388
...
e756c6e568
Author | SHA1 | Date | |
---|---|---|---|
e756c6e568 | |||
5f7aa0b2d6 | |||
7fac7a62cf | |||
b19642aa2e | |||
974e0d6ac2 | |||
7785166a7e |
@@ -3717,16 +3717,12 @@ class API_VM_Snapshot_Receive_Block(Resource):
|
||||
"name": "size",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "source_snapshot",
|
||||
"required": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
|
||||
Receive a full snapshot of a single RBD volume from another PVC cluster
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
@@ -3753,11 +3749,6 @@ class API_VM_Snapshot_Receive_Block(Resource):
|
||||
type: integer
|
||||
required: true
|
||||
description: The size in bytes of the Ceph RBD volume
|
||||
- in: query
|
||||
name: source_snapshot
|
||||
type: string
|
||||
required: false
|
||||
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
@@ -3775,13 +3766,151 @@ class API_VM_Snapshot_Receive_Block(Resource):
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block(
|
||||
return api_helper.vm_snapshot_receive_block_full(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume"),
|
||||
reqargs.get("snapshot"),
|
||||
int(reqargs.get("size")),
|
||||
flask.request.stream,
|
||||
source_snapshot=reqargs.get("source_snapshot"),
|
||||
flask.request,
|
||||
)
|
||||
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "pool",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "source_snapshot",
|
||||
"required": True,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def put(self, vm, reqargs):
|
||||
"""
|
||||
Receive a single diff element from a snapshot of a single RBD volume from another PVC cluster
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
- in: query
|
||||
name: source_snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot parent
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block_diff(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume"),
|
||||
reqargs.get("snapshot"),
|
||||
reqargs.get("source_snapshot"),
|
||||
flask.request,
|
||||
)
|
||||
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "pool",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def patch(self, vm, reqargs):
|
||||
"""
|
||||
Create the block snapshot at snapshot of volume
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block_createsnap(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume"),
|
||||
reqargs.get("snapshot"),
|
||||
)
|
||||
|
||||
|
||||
|
@@ -1306,82 +1306,32 @@ def vm_flush_locks(zkhandler, vm):
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_block(
|
||||
zkhandler, pool, volume, snapshot, size, stream, source_snapshot=None
|
||||
):
|
||||
def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, request):
|
||||
"""
|
||||
Receive an RBD volume from a remote system
|
||||
"""
|
||||
try:
|
||||
import rados
|
||||
import rbd
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
_, rbd_detail = pvc_ceph.get_list_volume(
|
||||
zkhandler, pool, limit=volume, is_fuzzy=False
|
||||
)
|
||||
if len(rbd_detail) > 0:
|
||||
volume_exists = True
|
||||
else:
|
||||
volume_exists = False
|
||||
_, rbd_detail = pvc_ceph.get_list_volume(
|
||||
zkhandler, pool, limit=volume, is_fuzzy=False
|
||||
)
|
||||
if len(rbd_detail) > 0:
|
||||
volume_exists = True
|
||||
else:
|
||||
volume_exists = False
|
||||
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
|
||||
if not source_snapshot and not volume_exists:
|
||||
rbd_inst = rbd.RBD()
|
||||
rbd_inst.create(ioctx, volume, size)
|
||||
retflag, retdata = pvc_ceph.add_volume(
|
||||
zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True
|
||||
)
|
||||
if not retflag:
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
retcode = 400
|
||||
|
||||
output = {"message": retdata.replace('"', "'")}
|
||||
return output, retcode
|
||||
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
last_chunk = 0
|
||||
chunk_size = 1024 * 1024 * 128
|
||||
|
||||
if source_snapshot:
|
||||
# Receiving diff data
|
||||
logger.info(
|
||||
f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}"
|
||||
)
|
||||
while True:
|
||||
chunk = stream.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
# Extract the offset and length (8 bytes each) and the data
|
||||
offset = int.from_bytes(chunk[:8], "big")
|
||||
length = int.from_bytes(chunk[8:16], "big")
|
||||
data = chunk[16 : 16 + length]
|
||||
image.write(data, offset)
|
||||
else:
|
||||
# Receiving full image
|
||||
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
|
||||
while True:
|
||||
chunk = flask.request.stream.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
image.write(chunk, last_chunk)
|
||||
last_chunk += len(chunk)
|
||||
|
||||
image.create_snap(snapshot)
|
||||
retflag, retdata = pvc_ceph.add_snapshot(
|
||||
zkhandler, pool, volume, snapshot, zk_only=True
|
||||
if not volume_exists:
|
||||
rbd_inst = rbd.RBD()
|
||||
rbd_inst.create(ioctx, volume, size)
|
||||
retflag, retdata = pvc_ceph.add_volume(
|
||||
zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True
|
||||
)
|
||||
if not retflag:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
@@ -1393,11 +1343,96 @@ def vm_snapshot_receive_block(
|
||||
output = {"message": retdata.replace('"', "'")}
|
||||
return output, retcode
|
||||
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
except Exception as e:
|
||||
return {"message": f"Failed to import block device: {e}"}, 400
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
last_chunk = 0
|
||||
chunk_size = 1024 * 1024 * 64
|
||||
|
||||
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
|
||||
while True:
|
||||
chunk = request.stream.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
image.write(chunk, last_chunk)
|
||||
last_chunk += len(chunk)
|
||||
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
return {"message": "Successfully received RBD block device"}, 200
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_block_diff(
|
||||
zkhandler, pool, volume, snapshot, source_snapshot, request
|
||||
):
|
||||
"""
|
||||
Receive an RBD volume from a remote system
|
||||
"""
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
if len(request.files) > 0:
|
||||
logger.info(f"Applying {len(request.files)} RBD diff chunks for {snapshot}")
|
||||
|
||||
for i in range(len(request.files)):
|
||||
object_key = f"object_{i}"
|
||||
if object_key in request.files:
|
||||
object_data = request.files[object_key].read()
|
||||
offset = int.from_bytes(object_data[:8], "big")
|
||||
length = int.from_bytes(object_data[8:16], "big")
|
||||
data = object_data[16 : 16 + length]
|
||||
logger.info(f"Applying RBD diff chunk at {offset} ({length} bytes)")
|
||||
image.write(data, offset)
|
||||
else:
|
||||
return {"message": "No data received"}, 400
|
||||
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
return {
|
||||
"message": f"Successfully received {len(request.files)} RBD diff chunks"
|
||||
}, 200
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
|
||||
"""
|
||||
Create the snapshot of a remote volume
|
||||
"""
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, volume)
|
||||
image.create_snap(snapshot)
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
retflag, retdata = pvc_ceph.add_snapshot(
|
||||
zkhandler, pool, volume, snapshot, zk_only=True
|
||||
)
|
||||
if not retflag:
|
||||
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
retcode = 400
|
||||
|
||||
output = {"message": retdata.replace('"', "'")}
|
||||
return output, retcode
|
||||
|
||||
return {"message": "Successfully received VM configuration data"}, 200
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
|
@@ -107,7 +107,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
|
||||
|
||||
# Start following the task state, updating progress as we go
|
||||
total_task = task_status.get("total")
|
||||
with progressbar(length=total_task, show_eta=False) as bar:
|
||||
with progressbar(length=total_task, width=20, show_eta=False) as bar:
|
||||
last_task = 0
|
||||
maxlen = 21
|
||||
echo(
|
||||
|
@@ -83,7 +83,7 @@ class UploadProgressBar(object):
|
||||
else:
|
||||
self.end_suffix = ""
|
||||
|
||||
self.bar = click.progressbar(length=self.length, show_eta=True)
|
||||
self.bar = click.progressbar(length=self.length, width=20, show_eta=True)
|
||||
|
||||
def update(self, monitor):
|
||||
bytes_cur = monitor.bytes_read
|
||||
|
@@ -1092,17 +1092,17 @@ def rollback_snapshot(zkhandler, pool, volume, name):
|
||||
),
|
||||
)
|
||||
|
||||
# 1. Roll back the snapshot
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"rbd snap rollback {}/{}@{}".format(pool, volume, name)
|
||||
# 1. Roll back the snapshot
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"rbd snap rollback {}/{}@{}".format(pool, volume, name)
|
||||
)
|
||||
if retcode:
|
||||
return (
|
||||
False,
|
||||
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
|
||||
volume, pool, name, stderr
|
||||
),
|
||||
)
|
||||
if retcode:
|
||||
return (
|
||||
False,
|
||||
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
|
||||
volume, pool, name, stderr
|
||||
),
|
||||
)
|
||||
|
||||
return True, 'Rolled back RBD volume "{}" in pool "{}" to snapshot "{}".'.format(
|
||||
volume, pool, name
|
||||
|
@@ -3190,18 +3190,20 @@ def vm_worker_send_snapshot(
|
||||
destination_api_timeout = (3.05, 172800)
|
||||
destination_api_headers = {
|
||||
"X-Api-Key": destination_api_key,
|
||||
"Content-Type": "application/octet-stream",
|
||||
}
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update(destination_api_headers)
|
||||
session.verify = destination_api_verify_ssl
|
||||
session.timeout = destination_api_timeout
|
||||
|
||||
try:
|
||||
# Hit the API root; this should return "PVC API version x"
|
||||
response = requests.get(
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/",
|
||||
timeout=destination_api_timeout,
|
||||
headers=None,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
if "PVC API" not in response.json().get("message"):
|
||||
raise ValueError("Remote API is not a PVC API or incorrect URI given")
|
||||
@@ -3225,13 +3227,10 @@ def vm_worker_send_snapshot(
|
||||
return False
|
||||
|
||||
# Hit the API "/status" endpoint to validate API key and cluster status
|
||||
response = requests.get(
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/status",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
destination_cluster_status = response.json()
|
||||
current_destination_pvc_version = destination_cluster_status.get(
|
||||
@@ -3260,16 +3259,13 @@ def vm_worker_send_snapshot(
|
||||
return False
|
||||
|
||||
# Check if the VM already exists on the remote
|
||||
response = requests.get(
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/vm/{domain}",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
destination_vm_status = response.json()
|
||||
if len(destination_vm_status) > 0:
|
||||
if type(destination_vm_status) is list and len(destination_vm_status) > 0:
|
||||
destination_vm_status = destination_vm_status[0]
|
||||
else:
|
||||
destination_vm_status = {}
|
||||
@@ -3358,10 +3354,37 @@ def vm_worker_send_snapshot(
|
||||
# Begin send, set stages
|
||||
total_stages = (
|
||||
2
|
||||
+ (3 * len(snapshot_rbdsnaps))
|
||||
+ (2 * len(snapshot_rbdsnaps))
|
||||
+ (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
|
||||
)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Sending VM configuration for {vm_name}@{snapshot_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
send_params = {
|
||||
"snapshot": snapshot_name,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
try:
|
||||
response = session.post(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
|
||||
headers={"Content-Type": "application/json"},
|
||||
params=send_params,
|
||||
json=vm_detail,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send config: {e}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Create the block devices on the remote side if this is a new VM send
|
||||
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
|
||||
rbd_name = rbd_detail["name"]
|
||||
@@ -3408,13 +3431,10 @@ def vm_worker_send_snapshot(
|
||||
)
|
||||
|
||||
# Check if the volume exists on the target
|
||||
response = requests.get(
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
if response.status_code != 404:
|
||||
fail(
|
||||
@@ -3429,30 +3449,133 @@ def vm_worker_send_snapshot(
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
|
||||
size = image.size()
|
||||
chunk_size_mb = 128
|
||||
chunk_size_mb = 64
|
||||
|
||||
if incremental_parent is not None:
|
||||
# Diff between incremental_parent and snapshot
|
||||
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd_name}"
|
||||
|
||||
def diff_chunker():
|
||||
def diff_cb(offset, length, exists):
|
||||
"""Callback to handle diff regions"""
|
||||
if exists:
|
||||
data = image.read(offset, length)
|
||||
yield (
|
||||
offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
|
||||
)
|
||||
|
||||
image.set_snap(incremental_parent)
|
||||
image.diff_iterate(0, size, incremental_parent, diff_cb)
|
||||
|
||||
data_stream = diff_chunker()
|
||||
celery_message = (
|
||||
f"Sending diff of {rbd_name}@{incremental_parent}→{snapshot_name}"
|
||||
)
|
||||
else:
|
||||
# Full image transfer
|
||||
celery_message = f"Sending full image of {rbd_name}@{snapshot_name}"
|
||||
|
||||
def chunker():
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
celery_message,
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
if incremental_parent is not None:
|
||||
# Createa single session to reuse connections
|
||||
send_params = {
|
||||
"pool": pool,
|
||||
"volume": volume,
|
||||
"snapshot": snapshot_name,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
|
||||
session.params.update(send_params)
|
||||
|
||||
# Send 32 objects (128MB) at once
|
||||
send_max_objects = 32
|
||||
batch_size_mb = 4 * send_max_objects
|
||||
batch_size = batch_size_mb * 1024 * 1024
|
||||
|
||||
total_chunks = 0
|
||||
|
||||
def diff_cb_count(offset, length, exists):
|
||||
nonlocal total_chunks
|
||||
if exists:
|
||||
total_chunks += 1
|
||||
|
||||
current_chunk = 0
|
||||
buffer = list()
|
||||
buffer_size = 0
|
||||
last_chunk_time = time.time()
|
||||
|
||||
def send_batch_multipart(buffer):
|
||||
nonlocal last_chunk_time
|
||||
files = {}
|
||||
for i in range(len(buffer)):
|
||||
files[f"object_{i}"] = (
|
||||
f"object_{i}",
|
||||
buffer[i],
|
||||
"application/octet-stream",
|
||||
)
|
||||
try:
|
||||
response = session.put(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||
files=files,
|
||||
stream=True,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send diff batch ({e}): {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
|
||||
current_chunk_time = time.time()
|
||||
chunk_time = current_chunk_time - last_chunk_time
|
||||
last_chunk_time = current_chunk_time
|
||||
chunk_speed = round(batch_size_mb / chunk_time, 1)
|
||||
update(
|
||||
celery,
|
||||
celery_message + f" ({chunk_speed} MB/s)",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
def add_block_to_multipart(buffer, offset, length, data):
|
||||
part_data = (
|
||||
offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
|
||||
) # Add header and data
|
||||
buffer.append(part_data)
|
||||
|
||||
def diff_cb_send(offset, length, exists):
|
||||
nonlocal current_chunk, buffer, buffer_size
|
||||
if exists:
|
||||
# Read the data for the current block
|
||||
data = image.read(offset, length)
|
||||
# Add the block to the multipart buffer
|
||||
add_block_to_multipart(buffer, offset, length, data)
|
||||
current_chunk += 1
|
||||
buffer_size += len(data)
|
||||
if buffer_size >= batch_size:
|
||||
send_batch_multipart(buffer)
|
||||
buffer.clear() # Clear the buffer after sending
|
||||
buffer_size = 0 # Reset buffer size
|
||||
|
||||
try:
|
||||
image.set_snap(snapshot_name)
|
||||
image.diff_iterate(
|
||||
0, size, incremental_parent, diff_cb_count, whole_object=True
|
||||
)
|
||||
image.diff_iterate(
|
||||
0, size, incremental_parent, diff_cb_send, whole_object=True
|
||||
)
|
||||
|
||||
if buffer:
|
||||
send_batch_multipart(buffer)
|
||||
buffer.clear() # Clear the buffer after sending
|
||||
buffer_size = 0 # Reset buffer size
|
||||
except Exception:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send snapshot: {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
else:
|
||||
|
||||
def full_chunker():
|
||||
chunk_size = 1024 * 1024 * chunk_size_mb
|
||||
current_chunk = 0
|
||||
last_chunk_time = time.time()
|
||||
@@ -3471,36 +3594,42 @@ def vm_worker_send_snapshot(
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
data_stream = chunker()
|
||||
send_params = {
|
||||
"pool": pool,
|
||||
"volume": volume,
|
||||
"snapshot": snapshot_name,
|
||||
"size": size,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
celery_message,
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
try:
|
||||
response = session.post(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
params=send_params,
|
||||
data=full_chunker(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send snapshot: {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
send_params = {
|
||||
"pool": pool,
|
||||
"volume": volume,
|
||||
"snapshot": snapshot_name,
|
||||
"size": size,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
send_headers = {
|
||||
"X-Api-Key": destination_api_key,
|
||||
"Content-Type": "application/octet-stream",
|
||||
"Transfer-Encoding": None, # Disable chunked transfer encoding
|
||||
}
|
||||
try:
|
||||
response = requests.post(
|
||||
response = session.patch(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||
timeout=destination_api_timeout,
|
||||
headers=send_headers,
|
||||
params=send_params,
|
||||
data=data_stream,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception:
|
||||
@@ -3514,39 +3643,6 @@ def vm_worker_send_snapshot(
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Sending VM configuration for {vm_name}@{snapshot_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
send_params = {
|
||||
"snapshot": snapshot_name,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
send_headers = {
|
||||
"X-Api-Key": destination_api_key,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
|
||||
timeout=destination_api_timeout,
|
||||
headers=send_headers,
|
||||
params=send_params,
|
||||
json=vm_detail,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send config: {e}",
|
||||
)
|
||||
return False
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
|
Reference in New Issue
Block a user