From fb8561cc5dda1209ce0b667409caf458df79a98b Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Mon, 30 Sep 2024 16:30:23 -0400 Subject: [PATCH] Actually fix incremental sending --- api-daemon/pvcapid/flaskapi.py | 4 +- api-daemon/pvcapid/helper.py | 37 ++++++--- daemon-common/vm.py | 139 ++++++++++++++++++--------------- 3 files changed, 103 insertions(+), 77 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index d3a4699c..e0eb11d4 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -3771,7 +3771,7 @@ class API_VM_Snapshot_Receive_Block(Resource): reqargs.get("volume"), reqargs.get("snapshot"), int(reqargs.get("size")), - flask.request.stream, + flask.request, ) @RequestParser( @@ -3846,7 +3846,7 @@ class API_VM_Snapshot_Receive_Block(Resource): reqargs.get("volume"), reqargs.get("snapshot"), reqargs.get("source_snapshot"), - flask.request.stream, + flask.request, ) @RequestParser( diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index e9ab460f..6dfacb67 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -1306,7 +1306,7 @@ def vm_flush_locks(zkhandler, vm): @ZKConnection(config) -def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, stream): +def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, request): """ Receive an RBD volume from a remote system """ @@ -1350,7 +1350,7 @@ def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, stre logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}") while True: - chunk = flask.request.stream.read(chunk_size) + chunk = request.stream.read(chunk_size) if not chunk: break image.write(chunk, last_chunk) @@ -1360,10 +1360,12 @@ def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, stre ioctx.close() cluster.shutdown() + return {"message": "Successfully received RBD block device"}, 200 + @ZKConnection(config) def vm_snapshot_receive_block_diff( - zkhandler, pool, volume, snapshot, source_snapshot, stream + zkhandler, pool, volume, snapshot, source_snapshot, request ): """ Receive an RBD volume from a remote system @@ -1376,22 +1378,29 @@ def vm_snapshot_receive_block_diff( ioctx = cluster.open_ioctx(pool) image = rbd.Image(ioctx, volume) - logger.info( - f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}" - ) + if len(request.files) > 0: + logger.info(f"Applying {len(request.files)} RBD diff chunks for {snapshot}") - chunk = stream.read() - - # Extract the offset and length (8 bytes each) and the data - offset = int.from_bytes(chunk[:8], "big") - length = int.from_bytes(chunk[8:16], "big") - data = chunk[16 : 16 + length] - image.write(data, offset) + for i in range(len(request.files)): + object_key = f"object_{i}" + if object_key in request.files: + object_data = request.files[object_key].read() + offset = int.from_bytes(object_data[:8], "big") + length = int.from_bytes(object_data[8:16], "big") + data = object_data[16 : 16 + length] + logger.info(f"Applying RBD diff chunk at {offset} ({length} bytes)") + image.write(data, offset) + else: + return {"message": "No data received"}, 400 image.close() ioctx.close() cluster.shutdown() + return { + "message": f"Successfully received {len(request.files)} RBD diff chunks" + }, 200 + @ZKConnection(config) def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot): @@ -1423,6 +1432,8 @@ def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot): output = {"message": retdata.replace('"', "'")} return output, retcode + return {"message": "Successfully received VM configuration data"}, 200 + @ZKConnection(config) def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=None): diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 8a35dfeb..b9a6bfc4 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -3190,18 +3190,20 @@ def vm_worker_send_snapshot( destination_api_timeout = (3.05, 172800) destination_api_headers = { "X-Api-Key": destination_api_key, - "Content-Type": "application/octet-stream", } + session = requests.Session() + session.headers.update(destination_api_headers) + session.verify = destination_api_verify_ssl + session.timeout = destination_api_timeout + try: # Hit the API root; this should return "PVC API version x" - response = requests.get( + response = session.get( f"{destination_api_uri}/", timeout=destination_api_timeout, - headers=None, params=None, data=None, - verify=destination_api_verify_ssl, ) if "PVC API" not in response.json().get("message"): raise ValueError("Remote API is not a PVC API or incorrect URI given") @@ -3225,13 +3227,10 @@ def vm_worker_send_snapshot( return False # Hit the API "/status" endpoint to validate API key and cluster status - response = requests.get( + response = session.get( f"{destination_api_uri}/status", - timeout=destination_api_timeout, - headers=destination_api_headers, params=None, data=None, - verify=destination_api_verify_ssl, ) destination_cluster_status = response.json() current_destination_pvc_version = destination_cluster_status.get( @@ -3260,13 +3259,10 @@ def vm_worker_send_snapshot( return False # Check if the VM already exists on the remote - response = requests.get( + response = session.get( f"{destination_api_uri}/vm/{domain}", - timeout=destination_api_timeout, - headers=destination_api_headers, params=None, data=None, - verify=destination_api_verify_ssl, ) destination_vm_status = response.json() if type(destination_vm_status) is list and len(destination_vm_status) > 0: @@ -3374,18 +3370,12 @@ def vm_worker_send_snapshot( "snapshot": snapshot_name, "source_snapshot": incremental_parent, } - send_headers = { - "X-Api-Key": destination_api_key, - "Content-Type": "application/json", - } try: - response = requests.post( + response = session.post( f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", - timeout=destination_api_timeout, - headers=send_headers, + headers={"Content-Type": "application/json"}, params=send_params, json=vm_detail, - verify=destination_api_verify_ssl, ) response.raise_for_status() except Exception as e: @@ -3441,13 +3431,10 @@ def vm_worker_send_snapshot( ) # Check if the volume exists on the target - response = requests.get( + response = session.get( f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", - timeout=destination_api_timeout, - headers=destination_api_headers, params=None, data=None, - verify=destination_api_verify_ssl, ) if response.status_code != 404: fail( @@ -3467,7 +3454,7 @@ def vm_worker_send_snapshot( if incremental_parent is not None: # Diff between incremental_parent and snapshot celery_message = ( - f"Sending diff {incremental_parent} -> {snapshot_name} for {rbd_name}" + f"Sending diff of {rbd_name}@{incremental_parent} → {snapshot_name}" ) else: # Full image transfer @@ -3481,17 +3468,8 @@ def vm_worker_send_snapshot( total=total_stages, ) - send_headers = { - "X-Api-Key": destination_api_key, - "Content-Type": "application/octet-stream", - "Transfer-Encoding": None, # Disable chunked transfer encoding - } - if incremental_parent is not None: # Createa single session to reuse connections - session = requests.Session() - executor = ThreadPoolExecutor(max_workers=8) - send_params = { "pool": pool, "volume": volume, @@ -3499,6 +3477,13 @@ def vm_worker_send_snapshot( "source_snapshot": incremental_parent, } + session.params.update(send_params) + + # Send 32 objects (128MB) at once + send_max_objects = 32 + batch_size_mb = 4 * send_max_objects + batch_size = batch_size_mb * 1024 * 1024 + total_chunks = 0 def diff_cb_count(offset, length, exists): @@ -3507,33 +3492,63 @@ def vm_worker_send_snapshot( total_chunks += 1 current_chunk = 0 + buffer = list() + buffer_size = 0 + last_chunk_time = time.time() - def send_block(block): - response = session.put( - f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", - timeout=destination_api_timeout, - headers=send_headers, - params=send_params, - data=block, - verify=destination_api_verify_ssl, + def send_batch_multipart(buffer): + nonlocal last_chunk_time + files = {} + for i in range(len(buffer)): + files[f"object_{i}"] = ( + f"object_{i}", + buffer[i], + "application/octet-stream", + ) + try: + response = session.put( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + files=files, + stream=True, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send diff batch ({e}): {response.json()['message']}", + ) + return False + + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(batch_size_mb / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, ) - response.raise_for_status() + + def add_block_to_multipart(buffer, offset, length, data): + part_data = ( + offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data + ) # Add header and data + buffer.append(part_data) def diff_cb_send(offset, length, exists): - nonlocal current_chunk + nonlocal current_chunk, buffer, buffer_size if exists: + # Read the data for the current block data = image.read(offset, length) - block = offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data - - executor.submit(send_block, block) + # Add the block to the multipart buffer + add_block_to_multipart(buffer, offset, length, data) current_chunk += 1 - - update( - celery, - celery_message + f" ({current_chunk}/{total_chunks} objects)", - current=current_stage, - total=total_stages, - ) + buffer_size += len(data) + if buffer_size >= batch_size: + send_batch_multipart(buffer) + buffer.clear() # Clear the buffer after sending + buffer_size = 0 # Reset buffer size try: image.set_snap(snapshot_name) @@ -3543,6 +3558,11 @@ def vm_worker_send_snapshot( image.diff_iterate( 0, size, incremental_parent, diff_cb_send, whole_object=True ) + + if buffer: + send_batch_multipart(buffer) + buffer.clear() # Clear the buffer after sending + buffer_size = 0 # Reset buffer size except Exception: fail( celery, @@ -3583,13 +3603,11 @@ def vm_worker_send_snapshot( } try: - response = requests.post( + response = session.post( f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", - timeout=destination_api_timeout, - headers=send_headers, + headers={"Content-Type": "application/octet-stream"}, params=send_params, data=full_chunker(), - verify=destination_api_verify_ssl, ) response.raise_for_status() except Exception: @@ -3609,12 +3627,9 @@ def vm_worker_send_snapshot( "snapshot": snapshot_name, } try: - response = requests.patch( + response = session.patch( f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", - timeout=destination_api_timeout, - headers=send_headers, params=send_params, - verify=destination_api_verify_ssl, ) response.raise_for_status() except Exception: