Compare commits
3 Commits
fb8561cc5d
...
235299942a
Author | SHA1 | Date | |
---|---|---|---|
235299942a | |||
9aa32134a9 | |||
75eac356d5 |
@@ -6794,7 +6794,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
||||
- storage / ceph
|
||||
parameters:
|
||||
- in: query
|
||||
name: size
|
||||
name: new_size
|
||||
type: string
|
||||
required: false
|
||||
description: The new volume size in bytes (or with a metric suffix, i.e. k/M/G/T); must be greater than the previous size (shrinking not supported)
|
||||
|
@@ -1346,7 +1346,7 @@ def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, requ
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
last_chunk = 0
|
||||
chunk_size = 1024 * 1024 * 64
|
||||
chunk_size = 1024 * 1024 * 1024
|
||||
|
||||
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
|
||||
while True:
|
||||
|
@@ -3352,11 +3352,7 @@ def vm_worker_send_snapshot(
|
||||
return False
|
||||
|
||||
# Begin send, set stages
|
||||
total_stages = (
|
||||
2
|
||||
+ (2 * len(snapshot_rbdsnaps))
|
||||
+ (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
|
||||
)
|
||||
total_stages = 2 + (3 * len(snapshot_rbdsnaps))
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
@@ -3386,6 +3382,9 @@ def vm_worker_send_snapshot(
|
||||
return False
|
||||
|
||||
# Create the block devices on the remote side if this is a new VM send
|
||||
block_t_start = time.time()
|
||||
block_total_mb = 0
|
||||
|
||||
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
|
||||
rbd_name = rbd_detail["name"]
|
||||
pool, volume = rbd_name.split("/")
|
||||
@@ -3414,42 +3413,63 @@ def vm_worker_send_snapshot(
|
||||
return False
|
||||
|
||||
try:
|
||||
_ = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||
except Exception as e:
|
||||
error_message = f"Failed to get volume size for {rbd_name}: {e}"
|
||||
|
||||
if destination_storage_pool is not None:
|
||||
pool = destination_storage_pool
|
||||
|
||||
if current_destination_vm_state is None:
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Checking for remote volume {rbd_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Checking remote volume {rbd_name} for compliance",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Check if the volume exists on the target
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
params=None,
|
||||
data=None,
|
||||
# Check if the volume exists on the target
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
params=None,
|
||||
data=None,
|
||||
)
|
||||
if response.status_code != 404 and current_destination_vm_state is None:
|
||||
fail(
|
||||
celery,
|
||||
f"Remote storage pool {pool} already contains volume {volume}",
|
||||
)
|
||||
if response.status_code != 404:
|
||||
fail(
|
||||
celery,
|
||||
f"Remote storage pool {pool} already contains volume {volume}",
|
||||
return False
|
||||
|
||||
if current_destination_vm_state is not None:
|
||||
try:
|
||||
remote_volume_size = ceph.format_bytes_fromhuman(
|
||||
response.json()[0]["stats"]["size"]
|
||||
)
|
||||
except Exception as e:
|
||||
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
|
||||
fail(celery, error_message)
|
||||
return False
|
||||
|
||||
if local_volume_size != remote_volume_size:
|
||||
response = session.put(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
params={"new_size": local_volume_size, "force": True},
|
||||
)
|
||||
if response.status_code != 200:
|
||||
fail(
|
||||
celery,
|
||||
"Failed to resize remote volume to match local volume",
|
||||
)
|
||||
return False
|
||||
|
||||
# Send the volume to the remote
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
|
||||
size = image.size()
|
||||
chunk_size_mb = 64
|
||||
chunk_size_mb = 1024
|
||||
|
||||
if incremental_parent is not None:
|
||||
# Diff between incremental_parent and snapshot
|
||||
@@ -3555,6 +3575,7 @@ def vm_worker_send_snapshot(
|
||||
image.diff_iterate(
|
||||
0, size, incremental_parent, diff_cb_count, whole_object=True
|
||||
)
|
||||
block_total_mb += total_chunks * 4
|
||||
image.diff_iterate(
|
||||
0, size, incremental_parent, diff_cb_send, whole_object=True
|
||||
)
|
||||
@@ -3576,6 +3597,7 @@ def vm_worker_send_snapshot(
|
||||
else:
|
||||
|
||||
def full_chunker():
|
||||
nonlocal block_total_mb
|
||||
chunk_size = 1024 * 1024 * chunk_size_mb
|
||||
current_chunk = 0
|
||||
last_chunk_time = time.time()
|
||||
@@ -3583,6 +3605,7 @@ def vm_worker_send_snapshot(
|
||||
chunk = image.read(current_chunk, chunk_size)
|
||||
yield chunk
|
||||
current_chunk += chunk_size
|
||||
block_total_mb += len(chunk) / 1024 / 1024
|
||||
current_chunk_time = time.time()
|
||||
chunk_time = current_chunk_time - last_chunk_time
|
||||
last_chunk_time = current_chunk_time
|
||||
@@ -3643,10 +3666,13 @@ def vm_worker_send_snapshot(
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
block_t_end = time.time()
|
||||
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
|
||||
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
Reference in New Issue
Block a user