Compare commits
3 Commits
fb8561cc5d
...
235299942a
| Author | SHA1 | Date | |
|---|---|---|---|
| 235299942a | |||
| 9aa32134a9 | |||
| 75eac356d5 |
@@ -6794,7 +6794,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
|||||||
- storage / ceph
|
- storage / ceph
|
||||||
parameters:
|
parameters:
|
||||||
- in: query
|
- in: query
|
||||||
name: size
|
name: new_size
|
||||||
type: string
|
type: string
|
||||||
required: false
|
required: false
|
||||||
description: The new volume size in bytes (or with a metric suffix, i.e. k/M/G/T); must be greater than the previous size (shrinking not supported)
|
description: The new volume size in bytes (or with a metric suffix, i.e. k/M/G/T); must be greater than the previous size (shrinking not supported)
|
||||||
|
|||||||
@@ -1346,7 +1346,7 @@ def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, requ
|
|||||||
image = rbd.Image(ioctx, volume)
|
image = rbd.Image(ioctx, volume)
|
||||||
|
|
||||||
last_chunk = 0
|
last_chunk = 0
|
||||||
chunk_size = 1024 * 1024 * 64
|
chunk_size = 1024 * 1024 * 1024
|
||||||
|
|
||||||
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
|
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@@ -3352,11 +3352,7 @@ def vm_worker_send_snapshot(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# Begin send, set stages
|
# Begin send, set stages
|
||||||
total_stages = (
|
total_stages = 2 + (3 * len(snapshot_rbdsnaps))
|
||||||
2
|
|
||||||
+ (2 * len(snapshot_rbdsnaps))
|
|
||||||
+ (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
|
|
||||||
)
|
|
||||||
|
|
||||||
current_stage += 1
|
current_stage += 1
|
||||||
update(
|
update(
|
||||||
@@ -3386,6 +3382,9 @@ def vm_worker_send_snapshot(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# Create the block devices on the remote side if this is a new VM send
|
# Create the block devices on the remote side if this is a new VM send
|
||||||
|
block_t_start = time.time()
|
||||||
|
block_total_mb = 0
|
||||||
|
|
||||||
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
|
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
|
||||||
rbd_name = rbd_detail["name"]
|
rbd_name = rbd_detail["name"]
|
||||||
pool, volume = rbd_name.split("/")
|
pool, volume = rbd_name.split("/")
|
||||||
@@ -3414,18 +3413,17 @@ def vm_worker_send_snapshot(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_ = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Failed to get volume size for {rbd_name}: {e}"
|
error_message = f"Failed to get volume size for {rbd_name}: {e}"
|
||||||
|
|
||||||
if destination_storage_pool is not None:
|
if destination_storage_pool is not None:
|
||||||
pool = destination_storage_pool
|
pool = destination_storage_pool
|
||||||
|
|
||||||
if current_destination_vm_state is None:
|
|
||||||
current_stage += 1
|
current_stage += 1
|
||||||
update(
|
update(
|
||||||
celery,
|
celery,
|
||||||
f"Checking for remote volume {rbd_name}",
|
f"Checking remote volume {rbd_name} for compliance",
|
||||||
current=current_stage,
|
current=current_stage,
|
||||||
total=total_stages,
|
total=total_stages,
|
||||||
)
|
)
|
||||||
@@ -3436,20 +3434,42 @@ def vm_worker_send_snapshot(
|
|||||||
params=None,
|
params=None,
|
||||||
data=None,
|
data=None,
|
||||||
)
|
)
|
||||||
if response.status_code != 404:
|
if response.status_code != 404 and current_destination_vm_state is None:
|
||||||
fail(
|
fail(
|
||||||
celery,
|
celery,
|
||||||
f"Remote storage pool {pool} already contains volume {volume}",
|
f"Remote storage pool {pool} already contains volume {volume}",
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
if current_destination_vm_state is not None:
|
||||||
|
try:
|
||||||
|
remote_volume_size = ceph.format_bytes_fromhuman(
|
||||||
|
response.json()[0]["stats"]["size"]
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
|
||||||
|
fail(celery, error_message)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if local_volume_size != remote_volume_size:
|
||||||
|
response = session.put(
|
||||||
|
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||||
|
params={"new_size": local_volume_size, "force": True},
|
||||||
|
)
|
||||||
|
if response.status_code != 200:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
"Failed to resize remote volume to match local volume",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
# Send the volume to the remote
|
# Send the volume to the remote
|
||||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||||
cluster.connect()
|
cluster.connect()
|
||||||
ioctx = cluster.open_ioctx(pool)
|
ioctx = cluster.open_ioctx(pool)
|
||||||
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
|
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
|
||||||
size = image.size()
|
size = image.size()
|
||||||
chunk_size_mb = 64
|
chunk_size_mb = 1024
|
||||||
|
|
||||||
if incremental_parent is not None:
|
if incremental_parent is not None:
|
||||||
# Diff between incremental_parent and snapshot
|
# Diff between incremental_parent and snapshot
|
||||||
@@ -3555,6 +3575,7 @@ def vm_worker_send_snapshot(
|
|||||||
image.diff_iterate(
|
image.diff_iterate(
|
||||||
0, size, incremental_parent, diff_cb_count, whole_object=True
|
0, size, incremental_parent, diff_cb_count, whole_object=True
|
||||||
)
|
)
|
||||||
|
block_total_mb += total_chunks * 4
|
||||||
image.diff_iterate(
|
image.diff_iterate(
|
||||||
0, size, incremental_parent, diff_cb_send, whole_object=True
|
0, size, incremental_parent, diff_cb_send, whole_object=True
|
||||||
)
|
)
|
||||||
@@ -3576,6 +3597,7 @@ def vm_worker_send_snapshot(
|
|||||||
else:
|
else:
|
||||||
|
|
||||||
def full_chunker():
|
def full_chunker():
|
||||||
|
nonlocal block_total_mb
|
||||||
chunk_size = 1024 * 1024 * chunk_size_mb
|
chunk_size = 1024 * 1024 * chunk_size_mb
|
||||||
current_chunk = 0
|
current_chunk = 0
|
||||||
last_chunk_time = time.time()
|
last_chunk_time = time.time()
|
||||||
@@ -3583,6 +3605,7 @@ def vm_worker_send_snapshot(
|
|||||||
chunk = image.read(current_chunk, chunk_size)
|
chunk = image.read(current_chunk, chunk_size)
|
||||||
yield chunk
|
yield chunk
|
||||||
current_chunk += chunk_size
|
current_chunk += chunk_size
|
||||||
|
block_total_mb += len(chunk) / 1024 / 1024
|
||||||
current_chunk_time = time.time()
|
current_chunk_time = time.time()
|
||||||
chunk_time = current_chunk_time - last_chunk_time
|
chunk_time = current_chunk_time - last_chunk_time
|
||||||
last_chunk_time = current_chunk_time
|
last_chunk_time = current_chunk_time
|
||||||
@@ -3643,10 +3666,13 @@ def vm_worker_send_snapshot(
|
|||||||
ioctx.close()
|
ioctx.close()
|
||||||
cluster.shutdown()
|
cluster.shutdown()
|
||||||
|
|
||||||
|
block_t_end = time.time()
|
||||||
|
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
|
||||||
|
|
||||||
current_stage += 1
|
current_stage += 1
|
||||||
return finish(
|
return finish(
|
||||||
celery,
|
celery,
|
||||||
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
|
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
|
||||||
current=current_stage,
|
current=current_stage,
|
||||||
total=total_stages,
|
total=total_stages,
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user