Improve incremental send speed
This commit is contained in:
parent
7fac7a62cf
commit
5f7aa0b2d6
|
@ -3467,7 +3467,7 @@ def vm_worker_send_snapshot(
|
||||||
if incremental_parent is not None:
|
if incremental_parent is not None:
|
||||||
# Diff between incremental_parent and snapshot
|
# Diff between incremental_parent and snapshot
|
||||||
celery_message = (
|
celery_message = (
|
||||||
f"Sending diff {incremental_parent}>{snapshot_name} for {rbd_name}"
|
f"Sending diff {incremental_parent} -> {snapshot_name} for {rbd_name}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Full image transfer
|
# Full image transfer
|
||||||
|
@ -3488,6 +3488,10 @@ def vm_worker_send_snapshot(
|
||||||
}
|
}
|
||||||
|
|
||||||
if incremental_parent is not None:
|
if incremental_parent is not None:
|
||||||
|
# Createa single session to reuse connections
|
||||||
|
session = requests.Session()
|
||||||
|
executor = ThreadPoolExecutor(max_workers=8)
|
||||||
|
|
||||||
send_params = {
|
send_params = {
|
||||||
"pool": pool,
|
"pool": pool,
|
||||||
"volume": volume,
|
"volume": volume,
|
||||||
|
@ -3495,36 +3499,47 @@ def vm_worker_send_snapshot(
|
||||||
"source_snapshot": incremental_parent,
|
"source_snapshot": incremental_parent,
|
||||||
}
|
}
|
||||||
|
|
||||||
last_chunk_time = time.time()
|
total_chunks = 0
|
||||||
|
|
||||||
|
def diff_cb_count(offset, length, exists):
|
||||||
|
nonlocal total_chunks
|
||||||
|
if exists:
|
||||||
|
total_chunks += 1
|
||||||
|
|
||||||
|
current_chunk = 0
|
||||||
|
|
||||||
|
def send_block(block):
|
||||||
|
response = session.put(
|
||||||
|
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=send_headers,
|
||||||
|
params=send_params,
|
||||||
|
data=block,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
def diff_cb_send(offset, length, exists):
|
def diff_cb_send(offset, length, exists):
|
||||||
nonlocal last_chunk_time
|
nonlocal current_chunk
|
||||||
if exists:
|
if exists:
|
||||||
data = image.read(offset, length)
|
data = image.read(offset, length)
|
||||||
block = offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
|
block = offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
|
||||||
response = requests.put(
|
|
||||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
|
||||||
timeout=destination_api_timeout,
|
|
||||||
headers=send_headers,
|
|
||||||
params=send_params,
|
|
||||||
data=block,
|
|
||||||
verify=destination_api_verify_ssl,
|
|
||||||
)
|
|
||||||
response.raise_for_status()
|
|
||||||
|
|
||||||
current_chunk_time = time.time()
|
executor.submit(send_block, block)
|
||||||
chunk_time = current_chunk_time - last_chunk_time
|
current_chunk += 1
|
||||||
last_chunk_time = current_chunk_time
|
|
||||||
chunk_speed = round(4 / chunk_time, 1)
|
|
||||||
update(
|
update(
|
||||||
celery,
|
celery,
|
||||||
celery_message + f" ({chunk_speed} MB/s)",
|
celery_message + f" ({current_chunk}/{total_chunks} objects)",
|
||||||
current=current_stage,
|
current=current_stage,
|
||||||
total=total_stages,
|
total=total_stages,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
image.set_snap(snapshot_name)
|
image.set_snap(snapshot_name)
|
||||||
|
image.diff_iterate(
|
||||||
|
0, size, incremental_parent, diff_cb_count, whole_object=True
|
||||||
|
)
|
||||||
image.diff_iterate(
|
image.diff_iterate(
|
||||||
0, size, incremental_parent, diff_cb_send, whole_object=True
|
0, size, incremental_parent, diff_cb_send, whole_object=True
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue