Compare commits
No commits in common. "df4d437d31e548743c2e60ec1fb6c552826d15e4" and "7fe1262887f98791dbd6b0111b204ab2c796c276" have entirely different histories.
df4d437d31
...
7fe1262887
@ -1985,7 +1985,7 @@ class API_VM_Root(Resource):
|
||||
@Authenticator
|
||||
def post(self, reqargs):
|
||||
"""
|
||||
Define/create a new virtual machine
|
||||
Create a new virtual machine
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
@ -2143,10 +2143,8 @@ class API_VM_Element(Resource):
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Define/create a new virtual machine
|
||||
|
||||
Create new {vm}
|
||||
Note: The name {vm} is ignored; only the "name" value from the Libvirt XML is used
|
||||
|
||||
This endpoint is identical to "POST /api/v1/vm"
|
||||
---
|
||||
tags:
|
||||
@ -2803,22 +2801,10 @@ class API_VM_Locks(Resource):
|
||||
- vm
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
properties:
|
||||
task_id:
|
||||
type: string
|
||||
description: The internal Celery job ID of the task
|
||||
task_name:
|
||||
type: string
|
||||
description: The internal Celery job task name
|
||||
run_on:
|
||||
type: string
|
||||
description: The worker daemon instance assigned to run the task
|
||||
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
"""
|
||||
vm_node_detail, retcode = api_helper.vm_node(vm)
|
||||
if retcode == 200:
|
||||
@ -2938,12 +2924,11 @@ class API_VM_Device(Resource):
|
||||
required: true
|
||||
description: The raw Libvirt XML definition of the device to attach
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -2993,12 +2978,11 @@ class API_VM_Device(Resource):
|
||||
required: true
|
||||
description: The raw Libvirt XML definition of the device to detach
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -3250,12 +3234,11 @@ class API_VM_Snapshot(Resource):
|
||||
required: false
|
||||
description: A custom name for the snapshot instead of autogeneration by date
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
@ -3309,12 +3292,11 @@ class API_VM_Snapshot(Resource):
|
||||
required: true
|
||||
description: The name of the snapshot to remove
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
@ -3374,12 +3356,11 @@ class API_VM_Snapshot_Rollback(Resource):
|
||||
required: true
|
||||
description: The name of the snapshot to roll back to
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
@ -3455,16 +3436,15 @@ class API_VM_Snapshot_Export(Resource):
|
||||
description: The absolute file path to export the snapshot to on the active primary coordinator
|
||||
- in: query
|
||||
name: incremental_parent
|
||||
type: string
|
||||
type: boolean
|
||||
required: false
|
||||
description: A snapshot name to generate an incremental diff from
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
@ -3549,12 +3529,11 @@ class API_VM_Snapshot_Import(Resource):
|
||||
default: true
|
||||
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
@ -3593,428 +3572,6 @@ class API_VM_Snapshot_Import(Resource):
|
||||
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
|
||||
|
||||
|
||||
# /vm/<vm>/snapshot/send
|
||||
class API_VM_Snapshot_Send(Resource):
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "snapshot_name",
|
||||
"required": True,
|
||||
"helptext": "A snapshot name must be specified",
|
||||
},
|
||||
{
|
||||
"name": "destination_api_uri",
|
||||
"required": True,
|
||||
"helptext": "A destination API URI must be specified",
|
||||
},
|
||||
{
|
||||
"name": "destination_api_key",
|
||||
"required": True,
|
||||
"helptext": "A destination API key must be specified",
|
||||
},
|
||||
{
|
||||
"name": "destination_api_verify_ssl",
|
||||
"required": False,
|
||||
},
|
||||
{
|
||||
"name": "incremental_parent",
|
||||
"required": False,
|
||||
},
|
||||
{
|
||||
"name": "destination_storage_pool",
|
||||
"required": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Send a snapshot of a VM's disks and configuration to another PVC cluster
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: snapshot_name
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the snapshot to export (must exist)
|
||||
- in: query
|
||||
name: destination_api_uri
|
||||
type: string
|
||||
required: true
|
||||
description: The base API URI of the destination PVC cluster (with prefix if applicable)
|
||||
- in: query
|
||||
name: destination_api_key
|
||||
type: string
|
||||
required: true
|
||||
description: The API authentication key of the destination PVC cluster
|
||||
- in: query
|
||||
name: destination_api_verify_ssl
|
||||
type: boolean
|
||||
required: false
|
||||
default: true
|
||||
description: Whether or not to validate SSL certificates for an SSL-enabled destination API
|
||||
- in: query
|
||||
name: incremental_parent
|
||||
type: string
|
||||
required: false
|
||||
description: A snapshot name to generate an incremental diff from; incremental send only if unset
|
||||
- in: query
|
||||
name: destination_storage_pool
|
||||
type: string
|
||||
required: false
|
||||
default: source storage pool name
|
||||
description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
snapshot_name = reqargs.get("snapshot_name", None)
|
||||
destination_api_uri = reqargs.get("destination_api_uri", None)
|
||||
destination_api_key = reqargs.get("destination_api_key", None)
|
||||
destination_api_verify_ssl = bool(
|
||||
strtobool(reqargs.get("destination_api_verify_ssl", "true"))
|
||||
)
|
||||
incremental_parent = reqargs.get("incremental_parent", None)
|
||||
destination_storage_pool = reqargs.get("destination_storage_pool", None)
|
||||
|
||||
task = run_celery_task(
|
||||
"vm.send_snapshot",
|
||||
domain=vm,
|
||||
snapshot_name=snapshot_name,
|
||||
destination_api_uri=destination_api_uri,
|
||||
destination_api_key=destination_api_key,
|
||||
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||
incremental_parent=incremental_parent,
|
||||
destination_storage_pool=destination_storage_pool,
|
||||
run_on="primary",
|
||||
)
|
||||
|
||||
return (
|
||||
{
|
||||
"task_id": task.id,
|
||||
"task_name": "vm.send_snapshot",
|
||||
"run_on": f"{get_primary_node()} (primary)",
|
||||
},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_VM_Snapshot_Send, "/vm/<vm>/snapshot/send")
|
||||
|
||||
|
||||
# /vm/<vm>/snapshot/receive/block
|
||||
class API_VM_Snapshot_Receive_Block(Resource):
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "pool",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "size",
|
||||
"required": True,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Receive a full snapshot of a single RBD volume from another PVC cluster
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
- in: query
|
||||
name: size
|
||||
type: integer
|
||||
required: true
|
||||
description: The size in bytes of the Ceph RBD volume
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block_full(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume"),
|
||||
reqargs.get("snapshot"),
|
||||
int(reqargs.get("size")),
|
||||
flask.request,
|
||||
)
|
||||
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "pool",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "source_snapshot",
|
||||
"required": True,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def put(self, vm, reqargs):
|
||||
"""
|
||||
Receive a single diff element from a snapshot of a single RBD volume from another PVC cluster
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
- in: query
|
||||
name: source_snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot parent
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block_diff(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume"),
|
||||
reqargs.get("snapshot"),
|
||||
reqargs.get("source_snapshot"),
|
||||
flask.request,
|
||||
)
|
||||
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "pool",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def patch(self, vm, reqargs):
|
||||
"""
|
||||
Create the block snapshot at snapshot of volume
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block_createsnap(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume"),
|
||||
reqargs.get("snapshot"),
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
|
||||
|
||||
|
||||
# /vm/<vm>/snapshot/receive/config
|
||||
class API_VM_Snapshot_Receive_Config(Resource):
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "source_snapshot",
|
||||
"required": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Receive a snapshot of a VM configuration from another PVC cluster
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
- in: query
|
||||
name: size
|
||||
type: integer
|
||||
required: true
|
||||
description: The size in bytes of the Ceph RBD volume
|
||||
- in: query
|
||||
name: source_snapshot
|
||||
type: string
|
||||
required: false
|
||||
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_config(
|
||||
reqargs.get("snapshot"),
|
||||
flask.request.get_json(),
|
||||
source_snapshot=reqargs.get("source_snapshot"),
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_VM_Snapshot_Receive_Config, "/vm/<vm>/snapshot/receive/config")
|
||||
|
||||
|
||||
# /vm/autobackup
|
||||
class API_VM_Autobackup_Root(Resource):
|
||||
@RequestParser(
|
||||
@ -4044,12 +3601,14 @@ class API_VM_Autobackup_Root(Resource):
|
||||
type: string
|
||||
example: "user@domain.tld"
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
properties:
|
||||
task_id:
|
||||
type: string
|
||||
description: Task ID for the provisioner Celery worker
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -5535,17 +5094,11 @@ class API_Storage_Ceph_Benchmark(Resource):
|
||||
required: false
|
||||
description: An optional override name for the job
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the benchmark (unused elsewhere)
|
||||
"""
|
||||
# Verify that the pool is valid
|
||||
_list, code = api_helper.ceph_pool_list(
|
||||
@ -5672,12 +5225,11 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
|
||||
required: true
|
||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -5872,12 +5424,11 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
required: false
|
||||
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -5988,12 +5539,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
required: false
|
||||
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -6047,12 +5597,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
required: true
|
||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
@ -6106,7 +5655,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
- in: query
|
||||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
required: flase
|
||||
description: Force removal even if some step(s) fail
|
||||
- in: query
|
||||
name: yes-i-really-mean-it
|
||||
@ -6114,12 +5663,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
required: true
|
||||
description: A confirmation string to ensure that the API consumer really means it
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
@ -6707,7 +6255,7 @@ class API_Storage_Ceph_Volume_Root(Resource):
|
||||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: false
|
||||
default: flase
|
||||
description: Force action if volume creation would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
@ -6784,7 +6332,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
||||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: false
|
||||
default: flase
|
||||
description: Force action if volume creation would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
@ -6823,7 +6371,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
||||
- storage / ceph
|
||||
parameters:
|
||||
- in: query
|
||||
name: new_size
|
||||
name: size
|
||||
type: string
|
||||
required: false
|
||||
description: The new volume size in bytes (or with a metric suffix, i.e. k/M/G/T); must be greater than the previous size (shrinking not supported)
|
||||
@ -6836,7 +6384,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
||||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: false
|
||||
default: flase
|
||||
description: Force action if new volume size would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
@ -6961,7 +6509,7 @@ class API_Storage_Ceph_Volume_Element_Clone(Resource):
|
||||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: false
|
||||
default: flase
|
||||
description: Force action if clone volume size would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
@ -9816,12 +9364,14 @@ class API_Provisioner_Create_Root(Resource):
|
||||
type: string
|
||||
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
description: The Celery job information of the task
|
||||
id: CeleryTask
|
||||
properties:
|
||||
task_id:
|
||||
type: string
|
||||
description: Task ID for the provisioner Celery worker
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -21,9 +21,7 @@
|
||||
|
||||
import flask
|
||||
import json
|
||||
import logging
|
||||
import lxml.etree as etree
|
||||
import sys
|
||||
|
||||
from re import match
|
||||
from requests import get
|
||||
@ -42,15 +40,6 @@ import daemon_lib.network as pvc_network
|
||||
import daemon_lib.ceph as pvc_ceph
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(logging.INFO)
|
||||
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
|
||||
#
|
||||
# Cluster base functions
|
||||
#
|
||||
@ -1291,7 +1280,7 @@ def vm_flush_locks(zkhandler, vm):
|
||||
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
|
||||
)
|
||||
|
||||
if retdata[0].get("state") not in ["stop", "disable", "mirror"]:
|
||||
if retdata[0].get("state") not in ["stop", "disable"]:
|
||||
return {"message": "VM must be stopped to flush locks"}, 400
|
||||
|
||||
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
|
||||
@ -1305,313 +1294,6 @@ def vm_flush_locks(zkhandler, vm):
|
||||
return output, retcode
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, request):
|
||||
"""
|
||||
Receive an RBD volume from a remote system
|
||||
"""
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
_, rbd_detail = pvc_ceph.get_list_volume(
|
||||
zkhandler, pool, limit=volume, is_fuzzy=False
|
||||
)
|
||||
if len(rbd_detail) > 0:
|
||||
volume_exists = True
|
||||
else:
|
||||
volume_exists = False
|
||||
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
|
||||
if not volume_exists:
|
||||
rbd_inst = rbd.RBD()
|
||||
rbd_inst.create(ioctx, volume, size)
|
||||
retflag, retdata = pvc_ceph.add_volume(
|
||||
zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True
|
||||
)
|
||||
if not retflag:
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
retcode = 400
|
||||
|
||||
output = {"message": retdata.replace('"', "'")}
|
||||
return output, retcode
|
||||
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
last_chunk = 0
|
||||
chunk_size = 1024 * 1024 * 1024
|
||||
|
||||
logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}")
|
||||
while True:
|
||||
chunk = request.stream.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
image.write(chunk, last_chunk)
|
||||
last_chunk += len(chunk)
|
||||
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
return {"message": "Successfully received RBD block device"}, 200
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_block_diff(
|
||||
zkhandler, pool, volume, snapshot, source_snapshot, request
|
||||
):
|
||||
"""
|
||||
Receive an RBD volume from a remote system
|
||||
"""
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
if len(request.files) > 0:
|
||||
logger.info(f"Applying {len(request.files)} RBD diff chunks for {snapshot}")
|
||||
|
||||
for i in range(len(request.files)):
|
||||
object_key = f"object_{i}"
|
||||
if object_key in request.files:
|
||||
object_data = request.files[object_key].read()
|
||||
offset = int.from_bytes(object_data[:8], "big")
|
||||
length = int.from_bytes(object_data[8:16], "big")
|
||||
data = object_data[16 : 16 + length]
|
||||
logger.info(f"Applying RBD diff chunk at {offset} ({length} bytes)")
|
||||
image.write(data, offset)
|
||||
else:
|
||||
return {"message": "No data received"}, 400
|
||||
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
return {
|
||||
"message": f"Successfully received {len(request.files)} RBD diff chunks"
|
||||
}, 200
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
|
||||
"""
|
||||
Create the snapshot of a remote volume
|
||||
"""
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, volume)
|
||||
image.create_snap(snapshot)
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
retflag, retdata = pvc_ceph.add_snapshot(
|
||||
zkhandler, pool, volume, snapshot, zk_only=True
|
||||
)
|
||||
if not retflag:
|
||||
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
retcode = 400
|
||||
|
||||
output = {"message": retdata.replace('"', "'")}
|
||||
return output, retcode
|
||||
|
||||
return {"message": "Successfully received VM configuration data"}, 200
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=None):
|
||||
"""
|
||||
Receive a VM configuration from a remote system
|
||||
|
||||
This function requires some explanation.
|
||||
|
||||
We get a full JSON dump of the VM configuration as provided by `pvc vm info`. This contains all the information we
|
||||
reasonably need to replicate the VM at the given snapshot, including metainformation.
|
||||
|
||||
First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists,
|
||||
this is an issue and we have to error. But this should have already happened with the RBD volumes.
|
||||
"""
|
||||
print(vm_config)
|
||||
|
||||
def parse_unified_diff(diff_text, original_text):
|
||||
"""
|
||||
Take a unified diff and apply it to an original string
|
||||
"""
|
||||
# Split the original string into lines
|
||||
original_lines = original_text.splitlines(keepends=True)
|
||||
patched_lines = []
|
||||
original_idx = 0 # Track position in original lines
|
||||
|
||||
diff_lines = diff_text.splitlines(keepends=True)
|
||||
|
||||
for line in diff_lines:
|
||||
if line.startswith("---") or line.startswith("+++"):
|
||||
# Ignore prefix lines
|
||||
continue
|
||||
if line.startswith("@@"):
|
||||
# Extract line numbers from the diff hunk header
|
||||
hunk_header = line
|
||||
parts = hunk_header.split(" ")
|
||||
original_range = parts[1]
|
||||
|
||||
# Get the starting line number and range length for the original file
|
||||
original_start, _ = map(int, original_range[1:].split(","))
|
||||
|
||||
# Adjust for zero-based indexing
|
||||
original_start -= 1
|
||||
|
||||
# Add any lines between the current index and the next hunk's start
|
||||
while original_idx < original_start:
|
||||
patched_lines.append(original_lines[original_idx])
|
||||
original_idx += 1
|
||||
|
||||
elif line.startswith("-"):
|
||||
# This line should be removed from the original, skip it
|
||||
original_idx += 1
|
||||
elif line.startswith("+"):
|
||||
# This line should be added to the patched version, removing the '+'
|
||||
patched_lines.append(line[1:])
|
||||
else:
|
||||
# Context line (unchanged), it has no prefix, add from the original
|
||||
patched_lines.append(original_lines[original_idx])
|
||||
original_idx += 1
|
||||
|
||||
# Add any remaining lines from the original file after the last hunk
|
||||
patched_lines.extend(original_lines[original_idx:])
|
||||
|
||||
return "".join(patched_lines).strip()
|
||||
|
||||
# Get our XML configuration for this snapshot
|
||||
# We take the main XML configuration, then apply the diff for this particular incremental
|
||||
current_snapshot = [s for s in vm_config["snapshots"] if s["name"] == snapshot][0]
|
||||
vm_xml = vm_config["xml"]
|
||||
vm_xml_diff = "\n".join(current_snapshot["xml_diff_lines"])
|
||||
snapshot_vm_xml = parse_unified_diff(vm_xml_diff, vm_xml)
|
||||
|
||||
if (
|
||||
source_snapshot is not None
|
||||
or pvc_vm.searchClusterByUUID(zkhandler, vm_config["uuid"]) is not None
|
||||
):
|
||||
logger.info(
|
||||
f"Receiving incremental VM configuration for {vm_config['name']}@{snapshot}"
|
||||
)
|
||||
|
||||
# Modify the VM based on our passed detail
|
||||
retcode, retmsg = pvc_vm.modify_vm(
|
||||
zkhandler,
|
||||
vm_config["uuid"],
|
||||
False,
|
||||
snapshot_vm_xml,
|
||||
)
|
||||
retcode, retmsg = pvc_vm.modify_vm_metadata(
|
||||
zkhandler,
|
||||
vm_config["uuid"],
|
||||
None, # Node limits are left unchanged
|
||||
vm_config["node_selector"],
|
||||
vm_config["node_autostart"],
|
||||
vm_config["profile"],
|
||||
vm_config["migration_method"],
|
||||
vm_config["migration_max_downtime"],
|
||||
)
|
||||
|
||||
current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"]))
|
||||
new_vm_tags = [t["name"] for t in vm_config["tags"]]
|
||||
remove_tags = []
|
||||
add_tags = []
|
||||
for tag in vm_config["tags"]:
|
||||
if tag["name"] not in current_vm_tags:
|
||||
add_tags.append((tag["name"], tag["protected"]))
|
||||
for tag in current_vm_tags:
|
||||
if tag not in new_vm_tags:
|
||||
remove_tags.append(tag)
|
||||
|
||||
for tag in add_tags:
|
||||
name, protected = tag
|
||||
pvc_vm.modify_vm_tag(
|
||||
zkhandler, vm_config["uuid"], "add", name, protected=protected
|
||||
)
|
||||
for tag in remove_tags:
|
||||
pvc_vm.modify_vm_tag(zkhandler, vm_config["uuid"], "remove", name)
|
||||
else:
|
||||
logger.info(
|
||||
f"Receiving full VM configuration for {vm_config['name']}@{snapshot}"
|
||||
)
|
||||
|
||||
# Define the VM based on our passed detail
|
||||
retcode, retmsg = pvc_vm.define_vm(
|
||||
zkhandler,
|
||||
snapshot_vm_xml,
|
||||
None, # Target node is autoselected
|
||||
None, # Node limits are invalid here so ignore them
|
||||
vm_config["node_selector"],
|
||||
vm_config["node_autostart"],
|
||||
vm_config["migration_method"],
|
||||
vm_config["migration_max_downtime"],
|
||||
vm_config["profile"],
|
||||
vm_config["tags"],
|
||||
"mirror",
|
||||
)
|
||||
|
||||
# Add this snapshot to the VM manually in Zookeeper
|
||||
zkhandler.write(
|
||||
[
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
vm_config["uuid"],
|
||||
"domain_snapshot.name",
|
||||
snapshot,
|
||||
),
|
||||
snapshot,
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
vm_config["uuid"],
|
||||
"domain_snapshot.timestamp",
|
||||
snapshot,
|
||||
),
|
||||
current_snapshot["timestamp"],
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
vm_config["uuid"],
|
||||
"domain_snapshot.xml",
|
||||
snapshot,
|
||||
),
|
||||
snapshot_vm_xml,
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
vm_config["uuid"],
|
||||
"domain_snapshot.rbd_snapshots",
|
||||
snapshot,
|
||||
),
|
||||
",".join(current_snapshot["rbd_snapshots"]),
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
# Network functions
|
||||
#
|
||||
|
@ -2018,102 +2018,6 @@ def cli_vm_snapshot_import(
|
||||
finish(retcode, retmsg)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# > pvc vm snapshot send
|
||||
###############################################################################
|
||||
@click.command(
|
||||
name="send",
|
||||
short_help="Send a snapshot of a virtual machine to another PVC cluster.",
|
||||
)
|
||||
@connection_req
|
||||
@click.argument("domain")
|
||||
@click.argument("snapshot_name")
|
||||
@click.argument("destination")
|
||||
@click.option(
|
||||
"-k",
|
||||
"--destination-api-key",
|
||||
"destination_api_key",
|
||||
default=None,
|
||||
help="The API key of the destination cluster when specifying an API URI.",
|
||||
)
|
||||
@click.option(
|
||||
"-p",
|
||||
"--destination-pool",
|
||||
"destination_storage_pool",
|
||||
default=None,
|
||||
help="The target storage pool on the destination cluster, if it differs from the source pool.",
|
||||
)
|
||||
@click.option(
|
||||
"-i",
|
||||
"--incremental",
|
||||
"incremental_parent",
|
||||
default=None,
|
||||
help="Perform an incremental volume send from this parent snapshot.",
|
||||
)
|
||||
@click.option(
|
||||
"--wait/--no-wait",
|
||||
"wait_flag",
|
||||
is_flag=True,
|
||||
default=True,
|
||||
show_default=True,
|
||||
help="Wait or don't wait for task to complete, showing progress if waiting",
|
||||
)
|
||||
def cli_vm_snapshot_send(
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination,
|
||||
destination_api_key,
|
||||
destination_storage_pool,
|
||||
incremental_parent,
|
||||
wait_flag,
|
||||
):
|
||||
"""
|
||||
Send the (existing) snapshot SNAPSHOT_NAME of virtual machine DOMAIN to the remote PVC cluster DESTINATION.
|
||||
|
||||
DOMAIN may be a UUID or name. DESTINATION may be either a configured PVC connection name in this CLI instance (i.e. a valid argument to "--connection"), or a full API URI, including the scheme, port and API prefix; if using the latter, an API key can be specified with the "-k"/"--destination-api-key" option.
|
||||
|
||||
The send will include the VM configuration, metainfo, and a point-in-time snapshot of all attached RBD volumes.
|
||||
|
||||
By default, the storage pool of the sending cluster will be used at the destination cluster as well. If a pool of that name does not exist, specify one with the "-p"/"--detination-pool" option.
|
||||
|
||||
Incremental sends are possible by specifying the "-i"/"--incremental-parent" option along with a parent snapshot name. To correctly receive, that parent snapshot must exist on DESTINATION. Subsequent sends after the first do not have to be incremental, but an incremental send is likely to perform better than a full send if the VM experiences few writes.
|
||||
|
||||
WARNING: Once sent, the VM will be in the state "mirror" on the remote cluster. If it is subsequently started, for instance for disaster recovery, a new snapshot must be taken on the remote side and sent back or data will be inconsistent between the instances. Only VMs in the "mirror" state can accept new sends.
|
||||
|
||||
WARNING: This functionality has no automatic backout on the remote side. While a properly configured cluster should not fail any step in the process, a situation like an intermittent network connection might cause a failure which would have to be manually corrected on that side, usually by removing the mirrored VM and retrying, or rolling back to a previous snapshot and retrying. Future versions may enhance automatic recovery, but for now this would be up to the administrator.
|
||||
"""
|
||||
|
||||
connections_config = get_store(CLI_CONFIG["store_path"])
|
||||
if destination in connections_config.keys():
|
||||
destination_cluster_config = connections_config[destination]
|
||||
destination_api_uri = "{}://{}:{}{}".format(
|
||||
destination_cluster_config["scheme"],
|
||||
destination_cluster_config["host"],
|
||||
destination_cluster_config["port"],
|
||||
CLI_CONFIG["api_prefix"],
|
||||
)
|
||||
destination_api_key = destination_cluster_config["api_key"]
|
||||
else:
|
||||
destination_api_uri = destination
|
||||
destination_api_key = destination_api_key
|
||||
|
||||
retcode, retmsg = pvc.lib.vm.vm_send_snapshot(
|
||||
CLI_CONFIG,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=CLI_CONFIG.get("verify_ssl"),
|
||||
destination_storage_pool=destination_storage_pool,
|
||||
incremental_parent=incremental_parent,
|
||||
wait_flag=wait_flag,
|
||||
)
|
||||
|
||||
if retcode and wait_flag:
|
||||
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
|
||||
finish(retcode, retmsg)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# > pvc vm backup
|
||||
###############################################################################
|
||||
@ -6684,7 +6588,6 @@ cli_vm_snapshot.add_command(cli_vm_snapshot_remove)
|
||||
cli_vm_snapshot.add_command(cli_vm_snapshot_rollback)
|
||||
cli_vm_snapshot.add_command(cli_vm_snapshot_export)
|
||||
cli_vm_snapshot.add_command(cli_vm_snapshot_import)
|
||||
cli_vm_snapshot.add_command(cli_vm_snapshot_send)
|
||||
cli_vm.add_command(cli_vm_snapshot)
|
||||
cli_vm_backup.add_command(cli_vm_backup_create)
|
||||
cli_vm_backup.add_command(cli_vm_backup_restore)
|
||||
|
@ -206,12 +206,12 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
|
||||
|
||||
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
|
||||
|
||||
vm_states = ["start", "disable", "mirror"]
|
||||
vm_states = ["start", "disable"]
|
||||
vm_states.extend(
|
||||
[
|
||||
state
|
||||
for state in data.get("vms", {}).keys()
|
||||
if state not in ["total", "start", "disable", "mirror"]
|
||||
if state not in ["total", "start", "disable"]
|
||||
]
|
||||
)
|
||||
|
||||
@ -221,7 +221,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
|
||||
continue
|
||||
if state in ["start"]:
|
||||
state_colour = ansii["green"]
|
||||
elif state in ["migrate", "disable", "provision", "mirror"]:
|
||||
elif state in ["migrate", "disable", "provision"]:
|
||||
state_colour = ansii["blue"]
|
||||
elif state in ["stop", "fail"]:
|
||||
state_colour = ansii["red"]
|
||||
|
@ -19,8 +19,6 @@
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import sys
|
||||
|
||||
from click import progressbar
|
||||
from time import sleep, time
|
||||
|
||||
@ -107,7 +105,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
|
||||
|
||||
# Start following the task state, updating progress as we go
|
||||
total_task = task_status.get("total")
|
||||
with progressbar(length=total_task, width=20, show_eta=False) as bar:
|
||||
with progressbar(length=total_task, show_eta=False) as bar:
|
||||
last_task = 0
|
||||
maxlen = 21
|
||||
echo(
|
||||
@ -117,39 +115,30 @@ def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
|
||||
)
|
||||
while True:
|
||||
sleep(0.5)
|
||||
|
||||
task_status = pvc.lib.common.task_status(
|
||||
CLI_CONFIG, task_id=task_id, is_watching=True
|
||||
)
|
||||
|
||||
if isinstance(task_status, tuple):
|
||||
continue
|
||||
if task_status.get("state") != "RUNNING":
|
||||
break
|
||||
if task_status.get("current") == 0:
|
||||
continue
|
||||
|
||||
current_task = int(task_status.get("current"))
|
||||
total_task = int(task_status.get("total"))
|
||||
bar.length = total_task
|
||||
|
||||
if current_task > last_task:
|
||||
if task_status.get("current") > last_task:
|
||||
current_task = int(task_status.get("current"))
|
||||
total_task = int(task_status.get("total"))
|
||||
bar.length = total_task
|
||||
bar.update(current_task - last_task)
|
||||
last_task = current_task
|
||||
|
||||
curlen = len(str(task_status.get("status")))
|
||||
if curlen > maxlen:
|
||||
maxlen = curlen
|
||||
lendiff = maxlen - curlen
|
||||
overwrite_whitespace = " " * lendiff
|
||||
|
||||
percent_complete = (current_task / total_task) * 100
|
||||
bar_output = f"[{bar.format_bar()}] {percent_complete:3.0f}%"
|
||||
sys.stdout.write(
|
||||
f"\r {bar_output} {task_status['status']}{overwrite_whitespace}"
|
||||
# The extensive spaces at the end cause this to overwrite longer previous messages
|
||||
curlen = len(str(task_status.get("status")))
|
||||
if curlen > maxlen:
|
||||
maxlen = curlen
|
||||
lendiff = maxlen - curlen
|
||||
overwrite_whitespace = " " * lendiff
|
||||
echo(
|
||||
CLI_CONFIG,
|
||||
" " + task_status.get("status") + overwrite_whitespace,
|
||||
newline=False,
|
||||
)
|
||||
task_status = pvc.lib.common.task_status(
|
||||
CLI_CONFIG, task_id=task_id, is_watching=True
|
||||
)
|
||||
sys.stdout.flush()
|
||||
|
||||
if task_status.get("state") == "SUCCESS":
|
||||
bar.update(total_task - last_task)
|
||||
|
||||
|
@ -83,7 +83,7 @@ class UploadProgressBar(object):
|
||||
else:
|
||||
self.end_suffix = ""
|
||||
|
||||
self.bar = click.progressbar(length=self.length, width=20, show_eta=True)
|
||||
self.bar = click.progressbar(length=self.length, show_eta=True)
|
||||
|
||||
def update(self, monitor):
|
||||
bytes_cur = monitor.bytes_read
|
||||
|
@ -595,43 +595,6 @@ def vm_import_snapshot(
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def vm_send_snapshot(
|
||||
config,
|
||||
vm,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=True,
|
||||
destination_storage_pool=None,
|
||||
incremental_parent=None,
|
||||
wait_flag=True,
|
||||
):
|
||||
"""
|
||||
Send an (existing) snapshot of a VM's disks andconfiguration to a destination PVC cluster, optionally
|
||||
incremental with incremental_parent
|
||||
|
||||
API endpoint: POST /vm/{vm}/snapshot/send
|
||||
API arguments: snapshot_name=snapshot_name, destination_api_uri=destination_api_uri, destination_api_key=destination_api_key, incremental_parent=incremental_parent
|
||||
API schema: {"message":"{data}"}
|
||||
"""
|
||||
params = {
|
||||
"snapshot_name": snapshot_name,
|
||||
"destination_api_uri": destination_api_uri,
|
||||
"destination_api_key": destination_api_key,
|
||||
"destination_api_verify_ssl": destination_api_verify_ssl,
|
||||
}
|
||||
if destination_storage_pool is not None:
|
||||
params["destination_storage_pool"] = destination_storage_pool
|
||||
if incremental_parent is not None:
|
||||
params["incremental_parent"] = incremental_parent
|
||||
|
||||
response = call_api(
|
||||
config, "post", "/vm/{vm}/snapshot/send".format(vm=vm), params=params
|
||||
)
|
||||
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_flag=True):
|
||||
"""
|
||||
Perform a cluster VM autobackup
|
||||
@ -1797,7 +1760,6 @@ def format_info(config, domain_information, long_output):
|
||||
"provision": ansiprint.blue(),
|
||||
"restore": ansiprint.blue(),
|
||||
"import": ansiprint.blue(),
|
||||
"mirror": ansiprint.blue(),
|
||||
}
|
||||
ainformation.append(
|
||||
"{}State:{} {}{}{}".format(
|
||||
@ -2350,6 +2312,9 @@ def format_list(config, vm_list):
|
||||
ansiprint.end(),
|
||||
)
|
||||
)
|
||||
# Fix the length due to the extra fake characters
|
||||
vm_nets_length -= len(net_vni)
|
||||
vm_nets_length += len(net_string_list[net_idx])
|
||||
else:
|
||||
net_string_list.append(net_vni)
|
||||
|
||||
@ -2379,8 +2344,7 @@ def format_list(config, vm_list):
|
||||
vm_state=domain_information["state"],
|
||||
vm_tags=",".join(tag_list),
|
||||
vm_snapshots=len(domain_information.get("snapshots", list())),
|
||||
vm_networks=",".join(net_string_list)
|
||||
+ ("" if all(net_invalid_list) else " "),
|
||||
vm_networks=",".join(net_string_list),
|
||||
vm_memory=domain_information["memory"],
|
||||
vm_vcpu=domain_information["vcpu"],
|
||||
vm_node=domain_information["node"],
|
||||
|
@ -1092,17 +1092,17 @@ def rollback_snapshot(zkhandler, pool, volume, name):
|
||||
),
|
||||
)
|
||||
|
||||
# 1. Roll back the snapshot
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"rbd snap rollback {}/{}@{}".format(pool, volume, name)
|
||||
)
|
||||
if retcode:
|
||||
return (
|
||||
False,
|
||||
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
|
||||
volume, pool, name, stderr
|
||||
),
|
||||
# 1. Roll back the snapshot
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"rbd snap rollback {}/{}@{}".format(pool, volume, name)
|
||||
)
|
||||
if retcode:
|
||||
return (
|
||||
False,
|
||||
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
|
||||
volume, pool, name, stderr
|
||||
),
|
||||
)
|
||||
|
||||
return True, 'Rolled back RBD volume "{}" in pool "{}" to snapshot "{}".'.format(
|
||||
volume, pool, name
|
||||
@ -1168,14 +1168,11 @@ def get_list_snapshot(zkhandler, target_pool, target_volume, limit=None, is_fuzz
|
||||
continue
|
||||
if target_volume and volume_name != target_volume:
|
||||
continue
|
||||
try:
|
||||
snapshot_stats = json.loads(
|
||||
zkhandler.read(
|
||||
("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}")
|
||||
)
|
||||
snapshot_stats = json.loads(
|
||||
zkhandler.read(
|
||||
("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}")
|
||||
)
|
||||
except Exception:
|
||||
snapshot_stats = []
|
||||
)
|
||||
if limit:
|
||||
try:
|
||||
if re.fullmatch(limit, snapshot_name):
|
||||
|
@ -1160,7 +1160,6 @@ def get_resource_metrics(zkhandler):
|
||||
"fail": 8,
|
||||
"import": 9,
|
||||
"restore": 10,
|
||||
"mirror": 99,
|
||||
}
|
||||
state = vm["state"]
|
||||
output_lines.append(
|
||||
|
@ -85,7 +85,6 @@ vm_state_combinations = [
|
||||
"provision",
|
||||
"import",
|
||||
"restore",
|
||||
"mirror",
|
||||
]
|
||||
ceph_osd_state_combinations = [
|
||||
"up,in",
|
||||
|
@ -580,7 +580,7 @@ def rename_vm(zkhandler, domain, new_domain):
|
||||
|
||||
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
||||
state = zkhandler.read(("domain.state", dom_uuid))
|
||||
if state not in ["stop", "disable", "mirror"]:
|
||||
if state not in ["stop", "disable"]:
|
||||
return (
|
||||
False,
|
||||
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
|
||||
@ -1125,7 +1125,6 @@ def get_list(
|
||||
"migrate",
|
||||
"unmigrate",
|
||||
"provision",
|
||||
"mirror",
|
||||
]
|
||||
if state not in valid_states:
|
||||
return False, 'VM state "{}" is not valid.'.format(state)
|
||||
@ -1904,10 +1903,10 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
||||
|
||||
# Check that the domain is stopped (unless force_unlock is set)
|
||||
domain_state = zkhandler.read(("domain.state", dom_uuid))
|
||||
if not force_unlock and domain_state not in ["stop", "disable", "fail", "mirror"]:
|
||||
if not force_unlock and domain_state not in ["stop", "disable", "fail"]:
|
||||
fail(
|
||||
celery,
|
||||
f"VM state {domain_state} not in [stop, disable, fail, mirror] and not forcing",
|
||||
f"VM state {domain_state} not in [stop, disable, fail] and not forcing",
|
||||
)
|
||||
return False
|
||||
|
||||
@ -2330,7 +2329,7 @@ def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name):
|
||||
|
||||
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
||||
state = zkhandler.read(("domain.state", dom_uuid))
|
||||
if state not in ["stop", "disable", "mirror"]:
|
||||
if state not in ["stop", "disable"]:
|
||||
fail(
|
||||
celery,
|
||||
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
|
||||
@ -3119,560 +3118,3 @@ def vm_worker_import_snapshot(
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
|
||||
def vm_worker_send_snapshot(
|
||||
zkhandler,
|
||||
celery,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=True,
|
||||
incremental_parent=None,
|
||||
destination_storage_pool=None,
|
||||
):
|
||||
import requests
|
||||
import rados
|
||||
import rbd
|
||||
from packaging.version import parse as parse_version
|
||||
|
||||
current_stage = 0
|
||||
total_stages = 1
|
||||
start(
|
||||
celery,
|
||||
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Validate that VM exists in cluster
|
||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||
if not dom_uuid:
|
||||
fail(
|
||||
celery,
|
||||
f"Could not find VM '{domain}' in the cluster",
|
||||
)
|
||||
return False
|
||||
|
||||
# Get our side's VM configuration details
|
||||
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
|
||||
if not isinstance(vm_detail, dict):
|
||||
fail(
|
||||
celery,
|
||||
f"VM listing returned invalid data: {vm_detail}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if the snapshot exists
|
||||
if not zkhandler.exists(
|
||||
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if the incremental parent exists
|
||||
if incremental_parent is not None and not zkhandler.exists(
|
||||
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||
)
|
||||
return False
|
||||
|
||||
vm_name = vm_detail["name"]
|
||||
|
||||
# Validate that the destination cluster can be reached
|
||||
destination_api_timeout = (3.05, 172800)
|
||||
destination_api_headers = {
|
||||
"X-Api-Key": destination_api_key,
|
||||
}
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update(destination_api_headers)
|
||||
session.verify = destination_api_verify_ssl
|
||||
session.timeout = destination_api_timeout
|
||||
|
||||
try:
|
||||
# Hit the API root; this should return "PVC API version x"
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/",
|
||||
timeout=destination_api_timeout,
|
||||
params=None,
|
||||
data=None,
|
||||
)
|
||||
if "PVC API" not in response.json().get("message"):
|
||||
raise ValueError("Remote API is not a PVC API or incorrect URI given")
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Connection to remote API timed out: {e}",
|
||||
)
|
||||
return False
|
||||
except ValueError as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Connection to remote API is not valid: {e}",
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Connection to remote API failed: {e}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Hit the API "/status" endpoint to validate API key and cluster status
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/status",
|
||||
params=None,
|
||||
data=None,
|
||||
)
|
||||
destination_cluster_status = response.json()
|
||||
current_destination_pvc_version = destination_cluster_status.get(
|
||||
"pvc_version", None
|
||||
)
|
||||
if current_destination_pvc_version is None:
|
||||
fail(
|
||||
celery,
|
||||
"Connection to remote API failed: no PVC version information returned",
|
||||
)
|
||||
return False
|
||||
|
||||
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
|
||||
# Work around development versions
|
||||
current_destination_pvc_version = re.sub(
|
||||
r"~git-.*", "", current_destination_pvc_version
|
||||
)
|
||||
# Compare versions
|
||||
if parse_version(current_destination_pvc_version) < parse_version(
|
||||
expected_destination_pvc_version
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if the VM already exists on the remote
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/vm/{domain}",
|
||||
params=None,
|
||||
data=None,
|
||||
)
|
||||
destination_vm_status = response.json()
|
||||
if type(destination_vm_status) is list and len(destination_vm_status) > 0:
|
||||
destination_vm_status = destination_vm_status[0]
|
||||
else:
|
||||
destination_vm_status = {}
|
||||
|
||||
current_destination_vm_state = destination_vm_status.get("state", None)
|
||||
if (
|
||||
current_destination_vm_state is not None
|
||||
and current_destination_vm_state != "mirror"
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
"Remote PVC VM exists and is not a mirror",
|
||||
)
|
||||
return False
|
||||
|
||||
# Get details about VM snapshot
|
||||
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
|
||||
[
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.name",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.timestamp",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.xml",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.rbd_snapshots",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
|
||||
|
||||
# Get details about remote VM snapshots
|
||||
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
|
||||
|
||||
# Check if this snapshot is in the remote list already
|
||||
if snapshot_name in [s["name"] for s in destination_vm_snapshots]:
|
||||
fail(
|
||||
celery,
|
||||
f"Snapshot {snapshot_name} already exists on the target",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if this snapshot is older than the latest remote VM snapshot
|
||||
if (
|
||||
len(destination_vm_snapshots) > 0
|
||||
and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"]
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check that our incremental parent exists on the remote VM
|
||||
if incremental_parent is not None:
|
||||
if incremental_parent not in [s["name"] for s in destination_vm_snapshots]:
|
||||
fail(
|
||||
celery,
|
||||
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
|
||||
)
|
||||
return False
|
||||
|
||||
# Begin send, set stages
|
||||
total_stages = 2 + (3 * len(snapshot_rbdsnaps))
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Sending VM configuration for {vm_name}@{snapshot_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
send_params = {
|
||||
"snapshot": snapshot_name,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
try:
|
||||
response = session.post(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
|
||||
headers={"Content-Type": "application/json"},
|
||||
params=send_params,
|
||||
json=vm_detail,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send config: {e}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Create the block devices on the remote side if this is a new VM send
|
||||
block_t_start = time.time()
|
||||
block_total_mb = 0
|
||||
|
||||
for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]:
|
||||
rbd_name = rbd_detail["name"]
|
||||
pool, volume = rbd_name.split("/")
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Preparing remote volume for {rbd_name}@{snapshot_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Get the storage volume details
|
||||
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
|
||||
if not retcode or len(retdata) != 1:
|
||||
if len(retdata) < 1:
|
||||
error_message = f"No detail returned for volume {rbd_name}"
|
||||
elif len(retdata) > 1:
|
||||
error_message = f"Multiple details returned for volume {rbd_name}"
|
||||
else:
|
||||
error_message = f"Error getting details for volume {rbd_name}"
|
||||
fail(
|
||||
celery,
|
||||
error_message,
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||
except Exception as e:
|
||||
error_message = f"Failed to get volume size for {rbd_name}: {e}"
|
||||
|
||||
if destination_storage_pool is not None:
|
||||
pool = destination_storage_pool
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Checking remote volume {rbd_name} for compliance",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Check if the volume exists on the target
|
||||
response = session.get(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
params=None,
|
||||
data=None,
|
||||
)
|
||||
if response.status_code != 404 and current_destination_vm_state is None:
|
||||
fail(
|
||||
celery,
|
||||
f"Remote storage pool {pool} already contains volume {volume}",
|
||||
)
|
||||
return False
|
||||
|
||||
if current_destination_vm_state is not None:
|
||||
try:
|
||||
remote_volume_size = ceph.format_bytes_fromhuman(
|
||||
response.json()[0]["stats"]["size"]
|
||||
)
|
||||
except Exception as e:
|
||||
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
|
||||
fail(celery, error_message)
|
||||
return False
|
||||
|
||||
if local_volume_size != remote_volume_size:
|
||||
response = session.put(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
params={"new_size": local_volume_size, "force": True},
|
||||
)
|
||||
if response.status_code != 200:
|
||||
fail(
|
||||
celery,
|
||||
"Failed to resize remote volume to match local volume",
|
||||
)
|
||||
return False
|
||||
|
||||
# Send the volume to the remote
|
||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True)
|
||||
size = image.size()
|
||||
chunk_size_mb = 1024
|
||||
|
||||
if incremental_parent is not None:
|
||||
# Diff between incremental_parent and snapshot
|
||||
celery_message = (
|
||||
f"Sending diff of {rbd_name}@{incremental_parent} → {snapshot_name}"
|
||||
)
|
||||
else:
|
||||
# Full image transfer
|
||||
celery_message = f"Sending full image of {rbd_name}@{snapshot_name}"
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
celery_message,
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
if incremental_parent is not None:
|
||||
# Createa single session to reuse connections
|
||||
send_params = {
|
||||
"pool": pool,
|
||||
"volume": volume,
|
||||
"snapshot": snapshot_name,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
|
||||
session.params.update(send_params)
|
||||
|
||||
# Send 32 objects (128MB) at once
|
||||
send_max_objects = 32
|
||||
batch_size_mb = 4 * send_max_objects
|
||||
batch_size = batch_size_mb * 1024 * 1024
|
||||
|
||||
total_chunks = 0
|
||||
|
||||
def diff_cb_count(offset, length, exists):
|
||||
nonlocal total_chunks
|
||||
if exists:
|
||||
total_chunks += 1
|
||||
|
||||
current_chunk = 0
|
||||
buffer = list()
|
||||
buffer_size = 0
|
||||
last_chunk_time = time.time()
|
||||
|
||||
def send_batch_multipart(buffer):
|
||||
nonlocal last_chunk_time
|
||||
files = {}
|
||||
for i in range(len(buffer)):
|
||||
files[f"object_{i}"] = (
|
||||
f"object_{i}",
|
||||
buffer[i],
|
||||
"application/octet-stream",
|
||||
)
|
||||
try:
|
||||
response = session.put(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||
files=files,
|
||||
stream=True,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send diff batch ({e}): {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
|
||||
current_chunk_time = time.time()
|
||||
chunk_time = current_chunk_time - last_chunk_time
|
||||
last_chunk_time = current_chunk_time
|
||||
chunk_speed = round(batch_size_mb / chunk_time, 1)
|
||||
update(
|
||||
celery,
|
||||
celery_message + f" ({chunk_speed} MB/s)",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
def add_block_to_multipart(buffer, offset, length, data):
|
||||
part_data = (
|
||||
offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data
|
||||
) # Add header and data
|
||||
buffer.append(part_data)
|
||||
|
||||
def diff_cb_send(offset, length, exists):
|
||||
nonlocal current_chunk, buffer, buffer_size
|
||||
if exists:
|
||||
# Read the data for the current block
|
||||
data = image.read(offset, length)
|
||||
# Add the block to the multipart buffer
|
||||
add_block_to_multipart(buffer, offset, length, data)
|
||||
current_chunk += 1
|
||||
buffer_size += len(data)
|
||||
if buffer_size >= batch_size:
|
||||
send_batch_multipart(buffer)
|
||||
buffer.clear() # Clear the buffer after sending
|
||||
buffer_size = 0 # Reset buffer size
|
||||
|
||||
try:
|
||||
image.set_snap(snapshot_name)
|
||||
image.diff_iterate(
|
||||
0, size, incremental_parent, diff_cb_count, whole_object=True
|
||||
)
|
||||
block_total_mb += total_chunks * 4
|
||||
image.diff_iterate(
|
||||
0, size, incremental_parent, diff_cb_send, whole_object=True
|
||||
)
|
||||
|
||||
if buffer:
|
||||
send_batch_multipart(buffer)
|
||||
buffer.clear() # Clear the buffer after sending
|
||||
buffer_size = 0 # Reset buffer size
|
||||
except Exception:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send snapshot: {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
else:
|
||||
|
||||
def full_chunker():
|
||||
nonlocal block_total_mb
|
||||
chunk_size = 1024 * 1024 * chunk_size_mb
|
||||
current_chunk = 0
|
||||
last_chunk_time = time.time()
|
||||
while current_chunk < size:
|
||||
chunk = image.read(current_chunk, chunk_size)
|
||||
yield chunk
|
||||
current_chunk += chunk_size
|
||||
block_total_mb += len(chunk) / 1024 / 1024
|
||||
current_chunk_time = time.time()
|
||||
chunk_time = current_chunk_time - last_chunk_time
|
||||
last_chunk_time = current_chunk_time
|
||||
chunk_speed = round(chunk_size_mb / chunk_time, 1)
|
||||
update(
|
||||
celery,
|
||||
celery_message + f" ({chunk_speed} MB/s)",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
send_params = {
|
||||
"pool": pool,
|
||||
"volume": volume,
|
||||
"snapshot": snapshot_name,
|
||||
"size": size,
|
||||
"source_snapshot": incremental_parent,
|
||||
}
|
||||
|
||||
try:
|
||||
response = session.post(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
params=send_params,
|
||||
data=full_chunker(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send snapshot: {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
send_params = {
|
||||
"pool": pool,
|
||||
"volume": volume,
|
||||
"snapshot": snapshot_name,
|
||||
}
|
||||
try:
|
||||
response = session.patch(
|
||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
|
||||
params=send_params,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send snapshot: {response.json()['message']}",
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
block_t_end = time.time()
|
||||
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
@ -33,7 +33,6 @@ from daemon_lib.vm import (
|
||||
vm_worker_rollback_snapshot,
|
||||
vm_worker_export_snapshot,
|
||||
vm_worker_import_snapshot,
|
||||
vm_worker_send_snapshot,
|
||||
)
|
||||
from daemon_lib.ceph import (
|
||||
osd_worker_add_osd,
|
||||
@ -228,54 +227,6 @@ def vm_import_snapshot(
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on")
|
||||
def vm_send_snapshot(
|
||||
self,
|
||||
domain=None,
|
||||
snapshot_name=None,
|
||||
destination_api_uri="",
|
||||
destination_api_key="",
|
||||
destination_api_verify_ssl=True,
|
||||
incremental_parent=None,
|
||||
destination_storage_pool=None,
|
||||
run_on="primary",
|
||||
):
|
||||
@ZKConnection(config)
|
||||
def run_vm_send_snapshot(
|
||||
zkhandler,
|
||||
self,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=True,
|
||||
incremental_parent=None,
|
||||
destination_storage_pool=None,
|
||||
):
|
||||
return vm_worker_send_snapshot(
|
||||
zkhandler,
|
||||
self,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||
incremental_parent=incremental_parent,
|
||||
destination_storage_pool=destination_storage_pool,
|
||||
)
|
||||
|
||||
return run_vm_send_snapshot(
|
||||
self,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||
incremental_parent=incremental_parent,
|
||||
destination_storage_pool=destination_storage_pool,
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
||||
def osd_add(
|
||||
self,
|
||||
|
Loading…
x
Reference in New Issue
Block a user