From 5f7aa0b2d6b0fbeaf3862014a9962c90abaa388b Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Mon, 30 Sep 2024 04:15:17 -0400 Subject: [PATCH] Improve incremental send speed --- daemon-common/vm.py | 49 +++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 97addd91..8a35dfeb 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -3467,7 +3467,7 @@ def vm_worker_send_snapshot( if incremental_parent is not None: # Diff between incremental_parent and snapshot celery_message = ( - f"Sending diff {incremental_parent}>{snapshot_name} for {rbd_name}" + f"Sending diff {incremental_parent} -> {snapshot_name} for {rbd_name}" ) else: # Full image transfer @@ -3488,6 +3488,10 @@ def vm_worker_send_snapshot( } if incremental_parent is not None: + # Createa single session to reuse connections + session = requests.Session() + executor = ThreadPoolExecutor(max_workers=8) + send_params = { "pool": pool, "volume": volume, @@ -3495,36 +3499,47 @@ def vm_worker_send_snapshot( "source_snapshot": incremental_parent, } - last_chunk_time = time.time() + total_chunks = 0 + + def diff_cb_count(offset, length, exists): + nonlocal total_chunks + if exists: + total_chunks += 1 + + current_chunk = 0 + + def send_block(block): + response = session.put( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + data=block, + verify=destination_api_verify_ssl, + ) + response.raise_for_status() def diff_cb_send(offset, length, exists): - nonlocal last_chunk_time + nonlocal current_chunk if exists: data = image.read(offset, length) block = offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data - response = requests.put( - f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", - timeout=destination_api_timeout, - headers=send_headers, - params=send_params, - data=block, - verify=destination_api_verify_ssl, - ) - response.raise_for_status() - current_chunk_time = time.time() - chunk_time = current_chunk_time - last_chunk_time - last_chunk_time = current_chunk_time - chunk_speed = round(4 / chunk_time, 1) + executor.submit(send_block, block) + current_chunk += 1 + update( celery, - celery_message + f" ({chunk_speed} MB/s)", + celery_message + f" ({current_chunk}/{total_chunks} objects)", current=current_stage, total=total_stages, ) try: image.set_snap(snapshot_name) + image.diff_iterate( + 0, size, incremental_parent, diff_cb_count, whole_object=True + ) image.diff_iterate( 0, size, incremental_parent, diff_cb_send, whole_object=True )