Compare commits

..

No commits in common. "df4d437d31e548743c2e60ec1fb6c552826d15e4" and "7fe1262887f98791dbd6b0111b204ab2c796c276" have entirely different histories.

12 changed files with 109 additions and 1633 deletions

View File

@ -1985,7 +1985,7 @@ class API_VM_Root(Resource):
@Authenticator @Authenticator
def post(self, reqargs): def post(self, reqargs):
""" """
Define/create a new virtual machine Create a new virtual machine
--- ---
tags: tags:
- vm - vm
@ -2143,10 +2143,8 @@ class API_VM_Element(Resource):
@Authenticator @Authenticator
def post(self, vm, reqargs): def post(self, vm, reqargs):
""" """
Define/create a new virtual machine Create new {vm}
Note: The name {vm} is ignored; only the "name" value from the Libvirt XML is used Note: The name {vm} is ignored; only the "name" value from the Libvirt XML is used
This endpoint is identical to "POST /api/v1/vm" This endpoint is identical to "POST /api/v1/vm"
--- ---
tags: tags:
@ -2803,22 +2801,10 @@ class API_VM_Locks(Resource):
- vm - vm
responses: responses:
202: 202:
description: Accepted description: OK
schema: schema:
type: object type: string
description: The Celery job information of the task description: The Celery job ID of the task
id: CeleryTask
properties:
task_id:
type: string
description: The internal Celery job ID of the task
task_name:
type: string
description: The internal Celery job task name
run_on:
type: string
description: The worker daemon instance assigned to run the task
""" """
vm_node_detail, retcode = api_helper.vm_node(vm) vm_node_detail, retcode = api_helper.vm_node(vm)
if retcode == 200: if retcode == 200:
@ -2938,12 +2924,11 @@ class API_VM_Device(Resource):
required: true required: true
description: The raw Libvirt XML definition of the device to attach description: The raw Libvirt XML definition of the device to attach
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -2993,12 +2978,11 @@ class API_VM_Device(Resource):
required: true required: true
description: The raw Libvirt XML definition of the device to detach description: The raw Libvirt XML definition of the device to detach
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -3250,12 +3234,11 @@ class API_VM_Snapshot(Resource):
required: false required: false
description: A custom name for the snapshot instead of autogeneration by date description: A custom name for the snapshot instead of autogeneration by date
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3309,12 +3292,11 @@ class API_VM_Snapshot(Resource):
required: true required: true
description: The name of the snapshot to remove description: The name of the snapshot to remove
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3374,12 +3356,11 @@ class API_VM_Snapshot_Rollback(Resource):
required: true required: true
description: The name of the snapshot to roll back to description: The name of the snapshot to roll back to
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3455,16 +3436,15 @@ class API_VM_Snapshot_Export(Resource):
description: The absolute file path to export the snapshot to on the active primary coordinator description: The absolute file path to export the snapshot to on the active primary coordinator
- in: query - in: query
name: incremental_parent name: incremental_parent
type: string type: boolean
required: false required: false
description: A snapshot name to generate an incremental diff from description: A snapshot name to generate an incremental diff from
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3549,12 +3529,11 @@ class API_VM_Snapshot_Import(Resource):
default: true default: true
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3593,428 +3572,6 @@ class API_VM_Snapshot_Import(Resource):
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import") api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
# /vm/<vm>/snapshot/send
class API_VM_Snapshot_Send(Resource):
@RequestParser(
[
{
"name": "snapshot_name",
"required": True,
"helptext": "A snapshot name must be specified",
},
{
"name": "destination_api_uri",
"required": True,
"helptext": "A destination API URI must be specified",
},
{
"name": "destination_api_key",
"required": True,
"helptext": "A destination API key must be specified",
},
{
"name": "destination_api_verify_ssl",
"required": False,
},
{
"name": "incremental_parent",
"required": False,
},
{
"name": "destination_storage_pool",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Send a snapshot of a VM's disks and configuration to another PVC cluster
---
tags:
- vm
parameters:
- in: query
name: snapshot_name
type: string
required: true
description: The name of the snapshot to export (must exist)
- in: query
name: destination_api_uri
type: string
required: true
description: The base API URI of the destination PVC cluster (with prefix if applicable)
- in: query
name: destination_api_key
type: string
required: true
description: The API authentication key of the destination PVC cluster
- in: query
name: destination_api_verify_ssl
type: boolean
required: false
default: true
description: Whether or not to validate SSL certificates for an SSL-enabled destination API
- in: query
name: incremental_parent
type: string
required: false
description: A snapshot name to generate an incremental diff from; incremental send only if unset
- in: query
name: destination_storage_pool
type: string
required: false
default: source storage pool name
description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool
responses:
202:
description: Accepted
schema:
type: object
description: The Celery job information of the task
id: CeleryTask
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
snapshot_name = reqargs.get("snapshot_name", None)
destination_api_uri = reqargs.get("destination_api_uri", None)
destination_api_key = reqargs.get("destination_api_key", None)
destination_api_verify_ssl = bool(
strtobool(reqargs.get("destination_api_verify_ssl", "true"))
)
incremental_parent = reqargs.get("incremental_parent", None)
destination_storage_pool = reqargs.get("destination_storage_pool", None)
task = run_celery_task(
"vm.send_snapshot",
domain=vm,
snapshot_name=snapshot_name,
destination_api_uri=destination_api_uri,
destination_api_key=destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
run_on="primary",
)
return (
{
"task_id": task.id,
"task_name": "vm.send_snapshot",
"run_on": f"{get_primary_node()} (primary)",
},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_VM_Snapshot_Send, "/vm/<vm>/snapshot/send")
# /vm/<vm>/snapshot/receive/block
class API_VM_Snapshot_Receive_Block(Resource):
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
{
"name": "size",
"required": True,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a full snapshot of a single RBD volume from another PVC cluster
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: size
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block_full(
reqargs.get("pool"),
reqargs.get("volume"),
reqargs.get("snapshot"),
int(reqargs.get("size")),
flask.request,
)
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
{
"name": "source_snapshot",
"required": True,
},
]
)
@Authenticator
def put(self, vm, reqargs):
"""
Receive a single diff element from a snapshot of a single RBD volume from another PVC cluster
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: source_snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot parent
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block_diff(
reqargs.get("pool"),
reqargs.get("volume"),
reqargs.get("snapshot"),
reqargs.get("source_snapshot"),
flask.request,
)
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
]
)
@Authenticator
def patch(self, vm, reqargs):
"""
Create the block snapshot at snapshot of volume
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block_createsnap(
reqargs.get("pool"),
reqargs.get("volume"),
reqargs.get("snapshot"),
)
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
# /vm/<vm>/snapshot/receive/config
class API_VM_Snapshot_Receive_Config(Resource):
@RequestParser(
[
{
"name": "snapshot",
"required": True,
},
{
"name": "source_snapshot",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a snapshot of a VM configuration from another PVC cluster
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: size
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
- in: query
name: source_snapshot
type: string
required: false
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_config(
reqargs.get("snapshot"),
flask.request.get_json(),
source_snapshot=reqargs.get("source_snapshot"),
)
api.add_resource(API_VM_Snapshot_Receive_Config, "/vm/<vm>/snapshot/receive/config")
# /vm/autobackup # /vm/autobackup
class API_VM_Autobackup_Root(Resource): class API_VM_Autobackup_Root(Resource):
@RequestParser( @RequestParser(
@ -4044,12 +3601,14 @@ class API_VM_Autobackup_Root(Resource):
type: string type: string
example: "user@domain.tld" example: "user@domain.tld"
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task properties:
id: CeleryTask task_id:
type: string
description: Task ID for the provisioner Celery worker
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5535,17 +5094,11 @@ class API_Storage_Ceph_Benchmark(Resource):
required: false required: false
description: An optional override name for the job description: An optional override name for the job
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: string
description: The Celery job information of the task description: The Celery job ID of the benchmark (unused elsewhere)
id: CeleryTask
400:
description: Bad request
schema:
type: object
id: Message
""" """
# Verify that the pool is valid # Verify that the pool is valid
_list, code = api_helper.ceph_pool_list( _list, code = api_helper.ceph_pool_list(
@ -5672,12 +5225,11 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
required: true required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5872,12 +5424,11 @@ class API_Storage_Ceph_OSD_Root(Resource):
required: false required: false
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5988,12 +5539,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: false required: false
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -6047,12 +5597,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: true required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -6106,7 +5655,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
- in: query - in: query
name: force name: force
type: boolean type: boolean
required: false required: flase
description: Force removal even if some step(s) fail description: Force removal even if some step(s) fail
- in: query - in: query
name: yes-i-really-mean-it name: yes-i-really-mean-it
@ -6114,12 +5663,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: true required: true
description: A confirmation string to ensure that the API consumer really means it description: A confirmation string to ensure that the API consumer really means it
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task id: Message
id: CeleryTask
404: 404:
description: Not found description: Not found
schema: schema:
@ -6707,7 +6255,7 @@ class API_Storage_Ceph_Volume_Root(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: false default: flase
description: Force action if volume creation would violate 80% full soft cap on the pool description: Force action if volume creation would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6784,7 +6332,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: false default: flase
description: Force action if volume creation would violate 80% full soft cap on the pool description: Force action if volume creation would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6823,7 +6371,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
- storage / ceph - storage / ceph
parameters: parameters:
- in: query - in: query
name: new_size name: size
type: string type: string
required: false required: false
description: The new volume size in bytes (or with a metric suffix, i.e. k/M/G/T); must be greater than the previous size (shrinking not supported) description: The new volume size in bytes (or with a metric suffix, i.e. k/M/G/T); must be greater than the previous size (shrinking not supported)
@ -6836,7 +6384,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: false default: flase
description: Force action if new volume size would violate 80% full soft cap on the pool description: Force action if new volume size would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6961,7 +6509,7 @@ class API_Storage_Ceph_Volume_Element_Clone(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: false default: flase
description: Force action if clone volume size would violate 80% full soft cap on the pool description: Force action if clone volume size would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -9816,12 +9364,14 @@ class API_Provisioner_Create_Root(Resource):
type: string type: string
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
responses: responses:
202: 200:
description: Accepted description: OK
schema: schema:
type: object type: object
description: The Celery job information of the task properties:
id: CeleryTask task_id:
type: string
description: Task ID for the provisioner Celery worker
400: 400:
description: Bad request description: Bad request
schema: schema:

View File

@ -21,9 +21,7 @@
import flask import flask
import json import json
import logging
import lxml.etree as etree import lxml.etree as etree
import sys
from re import match from re import match
from requests import get from requests import get
@ -42,15 +40,6 @@ import daemon_lib.network as pvc_network
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as pvc_ceph
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# #
# Cluster base functions # Cluster base functions
# #
@ -1291,7 +1280,7 @@ def vm_flush_locks(zkhandler, vm):
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
) )
if retdata[0].get("state") not in ["stop", "disable", "mirror"]: if retdata[0].get("state") not in ["stop", "disable"]:
return {"message": "VM must be stopped to flush locks"}, 400 return {"message": "VM must be stopped to flush locks"}, 400
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm) retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
@ -1305,313 +1294,6 @@ def vm_flush_locks(zkhandler, vm):
return output, retcode return output, retcode
@ZKConnection(config)
def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, request):
"""
Receive an RBD volume from a remote system
"""
import rados
import rbd
_, rbd_detail = pvc_ceph.get_list_volume(
zkhandler, pool, limit=volume, is_fuzzy=False
)
if len(rbd_detail) > 0:
volume_exists = True
else:
volume_exists = False
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
if not volume_exists:
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, volume, size)
retflag, retdata = pvc_ceph.add_volume(
zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True
)
if not retflag:
ioctx.close()
cluster.shutdown()
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
image = rbd.Image(ioctx, volume)
last_chunk = 0
chunk_size = 1024 * 1024 * 1024
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
while True:
chunk = request.stream.read(chunk_size)
if not chunk:
break
image.write(chunk, last_chunk)
last_chunk += len(chunk)
image.close()
ioctx.close()
cluster.shutdown()
return {"message": "Successfully received RBD block device"}, 200
@ZKConnection(config)
def vm_snapshot_receive_block_diff(
zkhandler, pool, volume, snapshot, source_snapshot, request
):
"""
Receive an RBD volume from a remote system
"""
import rados
import rbd
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, volume)
if len(request.files) > 0:
logger.info(f"Applying {len(request.files)} RBD diff chunks for {snapshot}")
for i in range(len(request.files)):
object_key = f"object_{i}"
if object_key in request.files:
object_data = request.files[object_key].read()
offset = int.from_bytes(object_data[:8], "big")
length = int.from_bytes(object_data[8:16], "big")
data = object_data[16 : 16 + length]
logger.info(f"Applying RBD diff chunk at {offset} ({length} bytes)")
image.write(data, offset)
else:
return {"message": "No data received"}, 400
image.close()
ioctx.close()
cluster.shutdown()
return {
"message": f"Successfully received {len(request.files)} RBD diff chunks"
}, 200
@ZKConnection(config)
def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
"""
Create the snapshot of a remote volume
"""
import rados
import rbd
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, volume)
image.create_snap(snapshot)
image.close()
ioctx.close()
cluster.shutdown()
retflag, retdata = pvc_ceph.add_snapshot(
zkhandler, pool, volume, snapshot, zk_only=True
)
if not retflag:
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
return {"message": "Successfully received VM configuration data"}, 200
@ZKConnection(config)
def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=None):
"""
Receive a VM configuration from a remote system
This function requires some explanation.
We get a full JSON dump of the VM configuration as provided by `pvc vm info`. This contains all the information we
reasonably need to replicate the VM at the given snapshot, including metainformation.
First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists,
this is an issue and we have to error. But this should have already happened with the RBD volumes.
"""
print(vm_config)
def parse_unified_diff(diff_text, original_text):
"""
Take a unified diff and apply it to an original string
"""
# Split the original string into lines
original_lines = original_text.splitlines(keepends=True)
patched_lines = []
original_idx = 0 # Track position in original lines
diff_lines = diff_text.splitlines(keepends=True)
for line in diff_lines:
if line.startswith("---") or line.startswith("+++"):
# Ignore prefix lines
continue
if line.startswith("@@"):
# Extract line numbers from the diff hunk header
hunk_header = line
parts = hunk_header.split(" ")
original_range = parts[1]
# Get the starting line number and range length for the original file
original_start, _ = map(int, original_range[1:].split(","))
# Adjust for zero-based indexing
original_start -= 1
# Add any lines between the current index and the next hunk's start
while original_idx < original_start:
patched_lines.append(original_lines[original_idx])
original_idx += 1
elif line.startswith("-"):
# This line should be removed from the original, skip it
original_idx += 1
elif line.startswith("+"):
# This line should be added to the patched version, removing the '+'
patched_lines.append(line[1:])
else:
# Context line (unchanged), it has no prefix, add from the original
patched_lines.append(original_lines[original_idx])
original_idx += 1
# Add any remaining lines from the original file after the last hunk
patched_lines.extend(original_lines[original_idx:])
return "".join(patched_lines).strip()
# Get our XML configuration for this snapshot
# We take the main XML configuration, then apply the diff for this particular incremental
current_snapshot = [s for s in vm_config["snapshots"] if s["name"] == snapshot][0]
vm_xml = vm_config["xml"]
vm_xml_diff = "\n".join(current_snapshot["xml_diff_lines"])
snapshot_vm_xml = parse_unified_diff(vm_xml_diff, vm_xml)
if (
source_snapshot is not None
or pvc_vm.searchClusterByUUID(zkhandler, vm_config["uuid"]) is not None
):
logger.info(
f"Receiving incremental VM configuration for {vm_config['name']}@{snapshot}"
)
# Modify the VM based on our passed detail
retcode, retmsg = pvc_vm.modify_vm(
zkhandler,
vm_config["uuid"],
False,
snapshot_vm_xml,
)
retcode, retmsg = pvc_vm.modify_vm_metadata(
zkhandler,
vm_config["uuid"],
None, # Node limits are left unchanged
vm_config["node_selector"],
vm_config["node_autostart"],
vm_config["profile"],
vm_config["migration_method"],
vm_config["migration_max_downtime"],
)
current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"]))
new_vm_tags = [t["name"] for t in vm_config["tags"]]
remove_tags = []
add_tags = []
for tag in vm_config["tags"]:
if tag["name"] not in current_vm_tags:
add_tags.append((tag["name"], tag["protected"]))
for tag in current_vm_tags:
if tag not in new_vm_tags:
remove_tags.append(tag)
for tag in add_tags:
name, protected = tag
pvc_vm.modify_vm_tag(
zkhandler, vm_config["uuid"], "add", name, protected=protected
)
for tag in remove_tags:
pvc_vm.modify_vm_tag(zkhandler, vm_config["uuid"], "remove", name)
else:
logger.info(
f"Receiving full VM configuration for {vm_config['name']}@{snapshot}"
)
# Define the VM based on our passed detail
retcode, retmsg = pvc_vm.define_vm(
zkhandler,
snapshot_vm_xml,
None, # Target node is autoselected
None, # Node limits are invalid here so ignore them
vm_config["node_selector"],
vm_config["node_autostart"],
vm_config["migration_method"],
vm_config["migration_max_downtime"],
vm_config["profile"],
vm_config["tags"],
"mirror",
)
# Add this snapshot to the VM manually in Zookeeper
zkhandler.write(
[
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.name",
snapshot,
),
snapshot,
),
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.timestamp",
snapshot,
),
current_snapshot["timestamp"],
),
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.xml",
snapshot,
),
snapshot_vm_xml,
),
(
(
"domain.snapshots",
vm_config["uuid"],
"domain_snapshot.rbd_snapshots",
snapshot,
),
",".join(current_snapshot["rbd_snapshots"]),
),
]
)
# #
# Network functions # Network functions
# #

View File

@ -2018,102 +2018,6 @@ def cli_vm_snapshot_import(
finish(retcode, retmsg) finish(retcode, retmsg)
###############################################################################
# > pvc vm snapshot send
###############################################################################
@click.command(
name="send",
short_help="Send a snapshot of a virtual machine to another PVC cluster.",
)
@connection_req
@click.argument("domain")
@click.argument("snapshot_name")
@click.argument("destination")
@click.option(
"-k",
"--destination-api-key",
"destination_api_key",
default=None,
help="The API key of the destination cluster when specifying an API URI.",
)
@click.option(
"-p",
"--destination-pool",
"destination_storage_pool",
default=None,
help="The target storage pool on the destination cluster, if it differs from the source pool.",
)
@click.option(
"-i",
"--incremental",
"incremental_parent",
default=None,
help="Perform an incremental volume send from this parent snapshot.",
)
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress if waiting",
)
def cli_vm_snapshot_send(
domain,
snapshot_name,
destination,
destination_api_key,
destination_storage_pool,
incremental_parent,
wait_flag,
):
"""
Send the (existing) snapshot SNAPSHOT_NAME of virtual machine DOMAIN to the remote PVC cluster DESTINATION.
DOMAIN may be a UUID or name. DESTINATION may be either a configured PVC connection name in this CLI instance (i.e. a valid argument to "--connection"), or a full API URI, including the scheme, port and API prefix; if using the latter, an API key can be specified with the "-k"/"--destination-api-key" option.
The send will include the VM configuration, metainfo, and a point-in-time snapshot of all attached RBD volumes.
By default, the storage pool of the sending cluster will be used at the destination cluster as well. If a pool of that name does not exist, specify one with the "-p"/"--detination-pool" option.
Incremental sends are possible by specifying the "-i"/"--incremental-parent" option along with a parent snapshot name. To correctly receive, that parent snapshot must exist on DESTINATION. Subsequent sends after the first do not have to be incremental, but an incremental send is likely to perform better than a full send if the VM experiences few writes.
WARNING: Once sent, the VM will be in the state "mirror" on the remote cluster. If it is subsequently started, for instance for disaster recovery, a new snapshot must be taken on the remote side and sent back or data will be inconsistent between the instances. Only VMs in the "mirror" state can accept new sends.
WARNING: This functionality has no automatic backout on the remote side. While a properly configured cluster should not fail any step in the process, a situation like an intermittent network connection might cause a failure which would have to be manually corrected on that side, usually by removing the mirrored VM and retrying, or rolling back to a previous snapshot and retrying. Future versions may enhance automatic recovery, but for now this would be up to the administrator.
"""
connections_config = get_store(CLI_CONFIG["store_path"])
if destination in connections_config.keys():
destination_cluster_config = connections_config[destination]
destination_api_uri = "{}://{}:{}{}".format(
destination_cluster_config["scheme"],
destination_cluster_config["host"],
destination_cluster_config["port"],
CLI_CONFIG["api_prefix"],
)
destination_api_key = destination_cluster_config["api_key"]
else:
destination_api_uri = destination
destination_api_key = destination_api_key
retcode, retmsg = pvc.lib.vm.vm_send_snapshot(
CLI_CONFIG,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=CLI_CONFIG.get("verify_ssl"),
destination_storage_pool=destination_storage_pool,
incremental_parent=incremental_parent,
wait_flag=wait_flag,
)
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
############################################################################### ###############################################################################
# > pvc vm backup # > pvc vm backup
############################################################################### ###############################################################################
@ -6684,7 +6588,6 @@ cli_vm_snapshot.add_command(cli_vm_snapshot_remove)
cli_vm_snapshot.add_command(cli_vm_snapshot_rollback) cli_vm_snapshot.add_command(cli_vm_snapshot_rollback)
cli_vm_snapshot.add_command(cli_vm_snapshot_export) cli_vm_snapshot.add_command(cli_vm_snapshot_export)
cli_vm_snapshot.add_command(cli_vm_snapshot_import) cli_vm_snapshot.add_command(cli_vm_snapshot_import)
cli_vm_snapshot.add_command(cli_vm_snapshot_send)
cli_vm.add_command(cli_vm_snapshot) cli_vm.add_command(cli_vm_snapshot)
cli_vm_backup.add_command(cli_vm_backup_create) cli_vm_backup.add_command(cli_vm_backup_create)
cli_vm_backup.add_command(cli_vm_backup_restore) cli_vm_backup.add_command(cli_vm_backup_restore)

View File

@ -206,12 +206,12 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}") output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
vm_states = ["start", "disable", "mirror"] vm_states = ["start", "disable"]
vm_states.extend( vm_states.extend(
[ [
state state
for state in data.get("vms", {}).keys() for state in data.get("vms", {}).keys()
if state not in ["total", "start", "disable", "mirror"] if state not in ["total", "start", "disable"]
] ]
) )
@ -221,7 +221,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
continue continue
if state in ["start"]: if state in ["start"]:
state_colour = ansii["green"] state_colour = ansii["green"]
elif state in ["migrate", "disable", "provision", "mirror"]: elif state in ["migrate", "disable", "provision"]:
state_colour = ansii["blue"] state_colour = ansii["blue"]
elif state in ["stop", "fail"]: elif state in ["stop", "fail"]:
state_colour = ansii["red"] state_colour = ansii["red"]

View File

@ -19,8 +19,6 @@
# #
############################################################################### ###############################################################################
import sys
from click import progressbar from click import progressbar
from time import sleep, time from time import sleep, time
@ -107,7 +105,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
# Start following the task state, updating progress as we go # Start following the task state, updating progress as we go
total_task = task_status.get("total") total_task = task_status.get("total")
with progressbar(length=total_task, width=20, show_eta=False) as bar: with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0 last_task = 0
maxlen = 21 maxlen = 21
echo( echo(
@ -117,39 +115,30 @@ def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
) )
while True: while True:
sleep(0.5) sleep(0.5)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if isinstance(task_status, tuple): if isinstance(task_status, tuple):
continue continue
if task_status.get("state") != "RUNNING": if task_status.get("state") != "RUNNING":
break break
if task_status.get("current") == 0: if task_status.get("current") > last_task:
continue current_task = int(task_status.get("current"))
total_task = int(task_status.get("total"))
current_task = int(task_status.get("current")) bar.length = total_task
total_task = int(task_status.get("total"))
bar.length = total_task
if current_task > last_task:
bar.update(current_task - last_task) bar.update(current_task - last_task)
last_task = current_task last_task = current_task
# The extensive spaces at the end cause this to overwrite longer previous messages
curlen = len(str(task_status.get("status"))) curlen = len(str(task_status.get("status")))
if curlen > maxlen: if curlen > maxlen:
maxlen = curlen maxlen = curlen
lendiff = maxlen - curlen lendiff = maxlen - curlen
overwrite_whitespace = " " * lendiff overwrite_whitespace = " " * lendiff
echo(
percent_complete = (current_task / total_task) * 100 CLI_CONFIG,
bar_output = f"[{bar.format_bar()}] {percent_complete:3.0f}%" " " + task_status.get("status") + overwrite_whitespace,
sys.stdout.write( newline=False,
f"\r {bar_output} {task_status['status']}{overwrite_whitespace}" )
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
) )
sys.stdout.flush()
if task_status.get("state") == "SUCCESS": if task_status.get("state") == "SUCCESS":
bar.update(total_task - last_task) bar.update(total_task - last_task)

View File

@ -83,7 +83,7 @@ class UploadProgressBar(object):
else: else:
self.end_suffix = "" self.end_suffix = ""
self.bar = click.progressbar(length=self.length, width=20, show_eta=True) self.bar = click.progressbar(length=self.length, show_eta=True)
def update(self, monitor): def update(self, monitor):
bytes_cur = monitor.bytes_read bytes_cur = monitor.bytes_read

View File

@ -595,43 +595,6 @@ def vm_import_snapshot(
return get_wait_retdata(response, wait_flag) return get_wait_retdata(response, wait_flag)
def vm_send_snapshot(
config,
vm,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=True,
destination_storage_pool=None,
incremental_parent=None,
wait_flag=True,
):
"""
Send an (existing) snapshot of a VM's disks andconfiguration to a destination PVC cluster, optionally
incremental with incremental_parent
API endpoint: POST /vm/{vm}/snapshot/send
API arguments: snapshot_name=snapshot_name, destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, incremental_parent=incremental_parent
API schema: {"message":"{data}"}
"""
params = {
"snapshot_name": snapshot_name,
"destination_api_uri": destination_api_uri,
"destination_api_key": destination_api_key,
"destination_api_verify_ssl": destination_api_verify_ssl,
}
if destination_storage_pool is not None:
params["destination_storage_pool"] = destination_storage_pool
if incremental_parent is not None:
params["incremental_parent"] = incremental_parent
response = call_api(
config, "post", "/vm/{vm}/snapshot/send".format(vm=vm), params=params
)
return get_wait_retdata(response, wait_flag)
def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_flag=True): def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_flag=True):
""" """
Perform a cluster VM autobackup Perform a cluster VM autobackup
@ -1797,7 +1760,6 @@ def format_info(config, domain_information, long_output):
"provision": ansiprint.blue(), "provision": ansiprint.blue(),
"restore": ansiprint.blue(), "restore": ansiprint.blue(),
"import": ansiprint.blue(), "import": ansiprint.blue(),
"mirror": ansiprint.blue(),
} }
ainformation.append( ainformation.append(
"{}State:{} {}{}{}".format( "{}State:{} {}{}{}".format(
@ -2350,6 +2312,9 @@ def format_list(config, vm_list):
ansiprint.end(), ansiprint.end(),
) )
) )
# Fix the length due to the extra fake characters
vm_nets_length -= len(net_vni)
vm_nets_length += len(net_string_list[net_idx])
else: else:
net_string_list.append(net_vni) net_string_list.append(net_vni)
@ -2379,8 +2344,7 @@ def format_list(config, vm_list):
vm_state=domain_information["state"], vm_state=domain_information["state"],
vm_tags=",".join(tag_list), vm_tags=",".join(tag_list),
vm_snapshots=len(domain_information.get("snapshots", list())), vm_snapshots=len(domain_information.get("snapshots", list())),
vm_networks=",".join(net_string_list) vm_networks=",".join(net_string_list),
+ ("" if all(net_invalid_list) else " "),
vm_memory=domain_information["memory"], vm_memory=domain_information["memory"],
vm_vcpu=domain_information["vcpu"], vm_vcpu=domain_information["vcpu"],
vm_node=domain_information["node"], vm_node=domain_information["node"],

View File

@ -1092,17 +1092,17 @@ def rollback_snapshot(zkhandler, pool, volume, name):
), ),
) )
# 1. Roll back the snapshot # 1. Roll back the snapshot
retcode, stdout, stderr = common.run_os_command( retcode, stdout, stderr = common.run_os_command(
"rbd snap rollback {}/{}@{}".format(pool, volume, name) "rbd snap rollback {}/{}@{}".format(pool, volume, name)
)
if retcode:
return (
False,
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
volume, pool, name, stderr
),
) )
if retcode:
return (
False,
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
volume, pool, name, stderr
),
)
return True, 'Rolled back RBD volume "{}" in pool "{}" to snapshot "{}".'.format( return True, 'Rolled back RBD volume "{}" in pool "{}" to snapshot "{}".'.format(
volume, pool, name volume, pool, name
@ -1168,14 +1168,11 @@ def get_list_snapshot(zkhandler, target_pool, target_volume, limit=None, is_fuzz
continue continue
if target_volume and volume_name != target_volume: if target_volume and volume_name != target_volume:
continue continue
try: snapshot_stats = json.loads(
snapshot_stats = json.loads( zkhandler.read(
zkhandler.read( ("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}")
("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}")
)
) )
except Exception: )
snapshot_stats = []
if limit: if limit:
try: try:
if re.fullmatch(limit, snapshot_name): if re.fullmatch(limit, snapshot_name):

View File

@ -1160,7 +1160,6 @@ def get_resource_metrics(zkhandler):
"fail": 8, "fail": 8,
"import": 9, "import": 9,
"restore": 10, "restore": 10,
"mirror": 99,
} }
state = vm["state"] state = vm["state"]
output_lines.append( output_lines.append(

View File

@ -85,7 +85,6 @@ vm_state_combinations = [
"provision", "provision",
"import", "import",
"restore", "restore",
"mirror",
] ]
ceph_osd_state_combinations = [ ceph_osd_state_combinations = [
"up,in", "up,in",

View File

@ -580,7 +580,7 @@ def rename_vm(zkhandler, domain, new_domain):
# Verify that the VM is in a stopped state; renaming is not supported otherwise # Verify that the VM is in a stopped state; renaming is not supported otherwise
state = zkhandler.read(("domain.state", dom_uuid)) state = zkhandler.read(("domain.state", dom_uuid))
if state not in ["stop", "disable", "mirror"]: if state not in ["stop", "disable"]:
return ( return (
False, False,
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format( 'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
@ -1125,7 +1125,6 @@ def get_list(
"migrate", "migrate",
"unmigrate", "unmigrate",
"provision", "provision",
"mirror",
] ]
if state not in valid_states: if state not in valid_states:
return False, 'VM state "{}" is not valid.'.format(state) return False, 'VM state "{}" is not valid.'.format(state)
@ -1904,10 +1903,10 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
# Check that the domain is stopped (unless force_unlock is set) # Check that the domain is stopped (unless force_unlock is set)
domain_state = zkhandler.read(("domain.state", dom_uuid)) domain_state = zkhandler.read(("domain.state", dom_uuid))
if not force_unlock and domain_state not in ["stop", "disable", "fail", "mirror"]: if not force_unlock and domain_state not in ["stop", "disable", "fail"]:
fail( fail(
celery, celery,
f"VM state {domain_state} not in [stop, disable, fail, mirror] and not forcing", f"VM state {domain_state} not in [stop, disable, fail] and not forcing",
) )
return False return False
@ -2330,7 +2329,7 @@ def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name):
# Verify that the VM is in a stopped state; renaming is not supported otherwise # Verify that the VM is in a stopped state; renaming is not supported otherwise
state = zkhandler.read(("domain.state", dom_uuid)) state = zkhandler.read(("domain.state", dom_uuid))
if state not in ["stop", "disable", "mirror"]: if state not in ["stop", "disable"]:
fail( fail(
celery, celery,
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running", f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
@ -3119,560 +3118,3 @@ def vm_worker_import_snapshot(
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
) )
def vm_worker_send_snapshot(
zkhandler,
celery,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
):
import requests
import rados
import rbd
from packaging.version import parse as parse_version
current_stage = 0
total_stages = 1
start(
celery,
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
current=current_stage,
total=total_stages,
)
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
# Get our side's VM configuration details
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
if not isinstance(vm_detail, dict):
fail(
celery,
f"VM listing returned invalid data: {vm_detail}",
)
return False
# Check if the snapshot exists
if not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Check if the incremental parent exists
if incremental_parent is not None and not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
vm_name = vm_detail["name"]
# Validate that the destination cluster can be reached
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
"X-Api-Key": destination_api_key,
}
session = requests.Session()
session.headers.update(destination_api_headers)
session.verify = destination_api_verify_ssl
session.timeout = destination_api_timeout
try:
# Hit the API root; this should return "PVC API version x"
response = session.get(
f"{destination_api_uri}/",
timeout=destination_api_timeout,
params=None,
data=None,
)
if "PVC API" not in response.json().get("message"):
raise ValueError("Remote API is not a PVC API or incorrect URI given")
except requests.exceptions.ConnectionError as e:
fail(
celery,
f"Connection to remote API timed out: {e}",
)
return False
except ValueError as e:
fail(
celery,
f"Connection to remote API is not valid: {e}",
)
return False
except Exception as e:
fail(
celery,
f"Connection to remote API failed: {e}",
)
return False
# Hit the API "/status" endpoint to validate API key and cluster status
response = session.get(
f"{destination_api_uri}/status",
params=None,
data=None,
)
destination_cluster_status = response.json()
current_destination_pvc_version = destination_cluster_status.get(
"pvc_version", None
)
if current_destination_pvc_version is None:
fail(
celery,
"Connection to remote API failed: no PVC version information returned",
)
return False
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
# Work around development versions
current_destination_pvc_version = re.sub(
r"~git-.*", "", current_destination_pvc_version
)
# Compare versions
if parse_version(current_destination_pvc_version) < parse_version(
expected_destination_pvc_version
):
fail(
celery,
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
)
return False
# Check if the VM already exists on the remote
response = session.get(
f"{destination_api_uri}/vm/{domain}",
params=None,
data=None,
)
destination_vm_status = response.json()
if type(destination_vm_status) is list and len(destination_vm_status) > 0:
destination_vm_status = destination_vm_status[0]
else:
destination_vm_status = {}
current_destination_vm_state = destination_vm_status.get("state", None)
if (
current_destination_vm_state is not None
and current_destination_vm_state != "mirror"
):
fail(
celery,
"Remote PVC VM exists and is not a mirror",
)
return False
# Get details about VM snapshot
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
[
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.name",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.timestamp",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.xml",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.rbd_snapshots",
snapshot_name,
)
),
]
)
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
# Get details about remote VM snapshots
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
# Check if this snapshot is in the remote list already
if snapshot_name in [s["name"] for s in destination_vm_snapshots]:
fail(
celery,
f"Snapshot {snapshot_name} already exists on the target",
)
return False
# Check if this snapshot is older than the latest remote VM snapshot
if (
len(destination_vm_snapshots) > 0
and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"]
):
fail(
celery,
f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}",
)
return False
# Check that our incremental parent exists on the remote VM
if incremental_parent is not None:
if incremental_parent not in [s["name"] for s in destination_vm_snapshots]:
fail(
celery,
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
)
return False
# Begin send, set stages
total_stages = 2 + (3 * len(snapshot_rbdsnaps))
current_stage += 1
update(
celery,
f"Sending VM configuration for {vm_name}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
send_params = {
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
try:
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {e}",
)
return False
# Create the block devices on the remote side if this is a new VM send
block_t_start = time.time()
block_total_mb = 0
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
rbd_name = rbd_detail["name"]
pool, volume = rbd_name.split("/")
current_stage += 1
update(
celery,
f"Preparing remote volume for {rbd_name}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Get the storage volume details
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
if not retcode or len(retdata) != 1:
if len(retdata) < 1:
error_message = f"No detail returned for volume {rbd_name}"
elif len(retdata) > 1:
error_message = f"Multiple details returned for volume {rbd_name}"
else:
error_message = f"Error getting details for volume {rbd_name}"
fail(
celery,
error_message,
)
return False
try:
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
except Exception as e:
error_message = f"Failed to get volume size for {rbd_name}: {e}"
if destination_storage_pool is not None:
pool = destination_storage_pool
current_stage += 1
update(
celery,
f"Checking remote volume {rbd_name} for compliance",
current=current_stage,
total=total_stages,
)
# Check if the volume exists on the target
response = session.get(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
params=None,
data=None,
)
if response.status_code != 404 and current_destination_vm_state is None:
fail(
celery,
f"Remote storage pool {pool} already contains volume {volume}",
)
return False
if current_destination_vm_state is not None:
try:
remote_volume_size = ceph.format_bytes_fromhuman(
response.json()[0]["stats"]["size"]
)
except Exception as e:
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
fail(celery, error_message)
return False
if local_volume_size != remote_volume_size:
response = session.put(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
params={"new_size": local_volume_size, "force": True},
)
if response.status_code != 200:
fail(
celery,
"Failed to resize remote volume to match local volume",
)
return False
# Send the volume to the remote
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
size = image.size()
chunk_size_mb = 1024
if incremental_parent is not None:
# Diff between incremental_parent and snapshot
celery_message = (
f"Sending diff of {rbd_name}@{incremental_parent}{snapshot_name}"
)
else:
# Full image transfer
celery_message = f"Sending full image of {rbd_name}@{snapshot_name}"
current_stage += 1
update(
celery,
celery_message,
current=current_stage,
total=total_stages,
)
if incremental_parent is not None:
# Createa single session to reuse connections
send_params = {
"pool": pool,
"volume": volume,
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
session.params.update(send_params)
# Send 32 objects (128MB) at once
send_max_objects = 32
batch_size_mb = 4 * send_max_objects
batch_size = batch_size_mb * 1024 * 1024
total_chunks = 0
def diff_cb_count(offset, length, exists):
nonlocal total_chunks
if exists:
total_chunks += 1
current_chunk = 0
buffer = list()
buffer_size = 0
last_chunk_time = time.time()
def send_batch_multipart(buffer):
nonlocal last_chunk_time
files = {}
for i in range(len(buffer)):
files[f"object_{i}"] = (
f"object_{i}",
buffer[i],
"application/octet-stream",
)
try:
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send diff batch ({e}): {response.json()['message']}",
)
return False
current_chunk_time = time.time()
chunk_time = current_chunk_time - last_chunk_time
last_chunk_time = current_chunk_time
chunk_speed = round(batch_size_mb / chunk_time, 1)
update(
celery,
celery_message + f" ({chunk_speed} MB/s)",
current=current_stage,
total=total_stages,
)
def add_block_to_multipart(buffer, offset, length, data):
part_data = (
offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
) # Add header and data
buffer.append(part_data)
def diff_cb_send(offset, length, exists):
nonlocal current_chunk, buffer, buffer_size
if exists:
# Read the data for the current block
data = image.read(offset, length)
# Add the block to the multipart buffer
add_block_to_multipart(buffer, offset, length, data)
current_chunk += 1
buffer_size += len(data)
if buffer_size >= batch_size:
send_batch_multipart(buffer)
buffer.clear() # Clear the buffer after sending
buffer_size = 0 # Reset buffer size
try:
image.set_snap(snapshot_name)
image.diff_iterate(
0, size, incremental_parent, diff_cb_count, whole_object=True
)
block_total_mb += total_chunks * 4
image.diff_iterate(
0, size, incremental_parent, diff_cb_send, whole_object=True
)
if buffer:
send_batch_multipart(buffer)
buffer.clear() # Clear the buffer after sending
buffer_size = 0 # Reset buffer size
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
else:
def full_chunker():
nonlocal block_total_mb
chunk_size = 1024 * 1024 * chunk_size_mb
current_chunk = 0
last_chunk_time = time.time()
while current_chunk < size:
chunk = image.read(current_chunk, chunk_size)
yield chunk
current_chunk += chunk_size
block_total_mb += len(chunk) / 1024 / 1024
current_chunk_time = time.time()
chunk_time = current_chunk_time - last_chunk_time
last_chunk_time = current_chunk_time
chunk_speed = round(chunk_size_mb / chunk_time, 1)
update(
celery,
celery_message + f" ({chunk_speed} MB/s)",
current=current_stage,
total=total_stages,
)
send_params = {
"pool": pool,
"volume": volume,
"snapshot": snapshot_name,
"size": size,
"source_snapshot": incremental_parent,
}
try:
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
headers={"Content-Type": "application/octet-stream"},
params=send_params,
data=full_chunker(),
)
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
send_params = {
"pool": pool,
"volume": volume,
"snapshot": snapshot_name,
}
try:
response = session.patch(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params,
)
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
block_t_end = time.time()
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
current_stage += 1
return finish(
celery,
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
current=current_stage,
total=total_stages,
)

View File

@ -33,7 +33,6 @@ from daemon_lib.vm import (
vm_worker_rollback_snapshot, vm_worker_rollback_snapshot,
vm_worker_export_snapshot, vm_worker_export_snapshot,
vm_worker_import_snapshot, vm_worker_import_snapshot,
vm_worker_send_snapshot,
) )
from daemon_lib.ceph import ( from daemon_lib.ceph import (
osd_worker_add_osd, osd_worker_add_osd,
@ -228,54 +227,6 @@ def vm_import_snapshot(
) )
@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on")
def vm_send_snapshot(
self,
domain=None,
snapshot_name=None,
destination_api_uri="",
destination_api_key="",
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
run_on="primary",
):
@ZKConnection(config)
def run_vm_send_snapshot(
zkhandler,
self,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
):
return vm_worker_send_snapshot(
zkhandler,
self,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
)
return run_vm_send_snapshot(
self,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
)
@celery.task(name="osd.add", bind=True, routing_key="run_on") @celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add( def osd_add(
self, self,