Compare commits
5 Commits
master
...
snapshot-s
Author | SHA1 | Date |
---|---|---|
Joshua Boniface | fe1740ecd9 | |
Joshua Boniface | 5c3ce358df | |
Joshua Boniface | cf5ee23a18 | |
Joshua Boniface | c766648ed0 | |
Joshua Boniface | 6960513bbd |
|
@ -26,6 +26,6 @@ from os import path
|
|||
current_dir = path.dirname(path.abspath(__file__))
|
||||
sys.path.append(current_dir)
|
||||
|
||||
import pvcapid.Daemon # noqa: F401, E402
|
||||
import pvcapid.Daemon # noqa: F401
|
||||
|
||||
pvcapid.Daemon.entrypoint()
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
#
|
||||
###############################################################################
|
||||
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
from ssl import SSLContext, TLSVersion
|
||||
from distutils.util import strtobool as dustrtobool
|
||||
|
@ -35,7 +37,6 @@ API_VERSION = 1.0
|
|||
# Helper Functions
|
||||
##########################################################
|
||||
|
||||
|
||||
def strtobool(stringv):
|
||||
if stringv is None:
|
||||
return False
|
||||
|
@ -61,7 +62,6 @@ config["daemon_version"] = version
|
|||
# Flask App Creation for Gunicorn
|
||||
##########################################################
|
||||
|
||||
|
||||
def create_app():
|
||||
"""
|
||||
Create and return the Flask app and SSL context if necessary.
|
||||
|
@ -101,9 +101,8 @@ def create_app():
|
|||
# Entrypoint
|
||||
##########################################################
|
||||
|
||||
|
||||
def entrypoint():
|
||||
if config["debug"]:
|
||||
if config['debug']:
|
||||
app = create_app()
|
||||
|
||||
if config["api_ssl_enabled"]:
|
||||
|
@ -125,30 +124,21 @@ def entrypoint():
|
|||
else:
|
||||
# Build the command to run Gunicorn
|
||||
gunicorn_cmd = [
|
||||
"gunicorn",
|
||||
"--workers",
|
||||
"1",
|
||||
"--threads",
|
||||
"8",
|
||||
"--timeout",
|
||||
"86400",
|
||||
"--bind",
|
||||
"{}:{}".format(config["api_listen_address"], config["api_listen_port"]),
|
||||
"pvcapid.Daemon:create_app()",
|
||||
"--log-level",
|
||||
"info",
|
||||
"--access-logfile",
|
||||
"-",
|
||||
"--error-logfile",
|
||||
"-",
|
||||
'gunicorn',
|
||||
'--workers', '1',
|
||||
'--threads', '8',
|
||||
'--timeout', '86400',
|
||||
'--bind', '{}:{}'.format(config["api_listen_address"], config["api_listen_port"]),
|
||||
'pvcapid.Daemon:create_app()',
|
||||
'--log-level', 'info',
|
||||
'--access-logfile', '-',
|
||||
'--error-logfile', '-',
|
||||
]
|
||||
|
||||
if config["api_ssl_enabled"]:
|
||||
gunicorn_cmd += [
|
||||
"--certfile",
|
||||
config["api_ssl_cert_file"],
|
||||
"--keyfile",
|
||||
config["api_ssl_key_file"],
|
||||
'--certfile', config["api_ssl_cert_file"],
|
||||
'--keyfile', config["api_ssl_key_file"]
|
||||
]
|
||||
|
||||
# Run Gunicorn
|
||||
|
|
|
@ -2801,7 +2801,7 @@ class API_VM_Locks(Resource):
|
|||
- vm
|
||||
responses:
|
||||
202:
|
||||
description: OK
|
||||
description: Accepted
|
||||
schema:
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
|
@ -2924,11 +2924,11 @@ class API_VM_Device(Resource):
|
|||
required: true
|
||||
description: The raw Libvirt XML definition of the device to attach
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -2978,11 +2978,11 @@ class API_VM_Device(Resource):
|
|||
required: true
|
||||
description: The raw Libvirt XML definition of the device to detach
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -3234,11 +3234,11 @@ class API_VM_Snapshot(Resource):
|
|||
required: false
|
||||
description: A custom name for the snapshot instead of autogeneration by date
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
|
@ -3292,11 +3292,11 @@ class API_VM_Snapshot(Resource):
|
|||
required: true
|
||||
description: The name of the snapshot to remove
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
|
@ -3356,11 +3356,11 @@ class API_VM_Snapshot_Rollback(Resource):
|
|||
required: true
|
||||
description: The name of the snapshot to roll back to
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
|
@ -3436,15 +3436,15 @@ class API_VM_Snapshot_Export(Resource):
|
|||
description: The absolute file path to export the snapshot to on the active primary coordinator
|
||||
- in: query
|
||||
name: incremental_parent
|
||||
type: boolean
|
||||
type: string
|
||||
required: false
|
||||
description: A snapshot name to generate an incremental diff from
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
|
@ -3529,11 +3529,11 @@ class API_VM_Snapshot_Import(Resource):
|
|||
default: true
|
||||
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
|
@ -3572,6 +3572,220 @@ class API_VM_Snapshot_Import(Resource):
|
|||
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
|
||||
|
||||
|
||||
# /vm/<vm>/snapshot/send
|
||||
class API_VM_Snapshot_Send(Resource):
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "snapshot_name",
|
||||
"required": True,
|
||||
"helptext": "A snapshot name must be specified",
|
||||
},
|
||||
{
|
||||
"name": "destination_api_uri",
|
||||
"required": True,
|
||||
"helptext": "A destination API URI must be specified",
|
||||
},
|
||||
{
|
||||
"name": "destination_api_key",
|
||||
"required": True,
|
||||
"helptext": "A destination API key must be specified",
|
||||
},
|
||||
{
|
||||
"name": "destination_api_verify_ssl",
|
||||
"required": False,
|
||||
},
|
||||
{
|
||||
"name": "incremental_parent",
|
||||
"required": False,
|
||||
},
|
||||
{
|
||||
"name": "destination_storage_pool",
|
||||
"required": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Send a snapshot of a VM's disks and configuration to another PVC cluster
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: snapshot_name
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the snapshot to export (must exist)
|
||||
- in: query
|
||||
name: destination_api_uri
|
||||
type: string
|
||||
required: true
|
||||
description: The base API URI of the destination PVC cluster (with prefix if applicable)
|
||||
- in: query
|
||||
name: destination_api_key
|
||||
type: string
|
||||
required: true
|
||||
description: The API authentication key of the destination PVC cluster
|
||||
- in: query
|
||||
name: destination_api_verify_ssl
|
||||
type: boolean
|
||||
required: false
|
||||
default: true
|
||||
description: Whether or not to validate SSL certificates for an SSL-enabled destination API
|
||||
- in: query
|
||||
name: incremental_parent
|
||||
type: string
|
||||
required: false
|
||||
description: A snapshot name to generate an incremental diff from; incremental send only if unset
|
||||
- in: query
|
||||
name: destination_storage_pool
|
||||
type: string
|
||||
required: false
|
||||
default: source storage pool name
|
||||
description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool
|
||||
responses:
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
snapshot_name = reqargs.get("snapshot_name", None)
|
||||
destination_api_uri = reqargs.get("destination_api_uri", None)
|
||||
destination_api_key = reqargs.get("destination_api_key", None)
|
||||
destination_api_verify_ssl = vool(strtobool(reqargs.get("destination_api_verify_ssl", "true")))
|
||||
incremental_parent = reqargs.get("incremental_parent", None)
|
||||
destination_storage_pool = reqargs.get("destination_storage_pool", None)
|
||||
|
||||
task = run_celery_task(
|
||||
"vm.send_snapshot",
|
||||
domain=vm,
|
||||
snapshot_name=snapshot_name,
|
||||
destination_api_uri=destination_api_uri,
|
||||
destination_api_key=destination_api_key,
|
||||
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||
incremental_parent=incremental_parent,
|
||||
destination_storage_pool=destination_storage_pool,
|
||||
run_on="primary",
|
||||
)
|
||||
|
||||
return (
|
||||
{
|
||||
"task_id": task.id,
|
||||
"task_name": "vm.send_snapshot",
|
||||
"run_on": f"{get_primary_node()} (primary)",
|
||||
},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_VM_Snapshot_Send, "/vm/<vm>/snapshot/send")
|
||||
|
||||
|
||||
# /vm/<vm>/snapshot/receive/block
|
||||
class API_VM_Snapshot_Receive_Block(Resource):
|
||||
@RequestParser(
|
||||
[
|
||||
{
|
||||
"name": "pool",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "volume",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "snapshot",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "size",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"name": "source_snapshot",
|
||||
"required": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
def post(self, vm, reqargs):
|
||||
"""
|
||||
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
|
||||
|
||||
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||
---
|
||||
tags:
|
||||
- vm
|
||||
parameters:
|
||||
- in: query
|
||||
name: pool
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD data pool
|
||||
- in: query
|
||||
name: volume
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume
|
||||
- in: query
|
||||
name: snapshot
|
||||
type: string
|
||||
required: true
|
||||
description: The name of the destination Ceph RBD volume snapshot
|
||||
- in: query
|
||||
name: size
|
||||
type: integer
|
||||
required: true
|
||||
description: The size in bytes of the Ceph RBD volume
|
||||
- in: query
|
||||
name: source_snapshot
|
||||
type: string
|
||||
required: false
|
||||
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Execution error
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.vm_snapshot_receive_block(
|
||||
reqargs.get("pool"),
|
||||
reqargs.get("volume") + "_recv",
|
||||
reqargs.get("snapshot"),
|
||||
int(reqargs.get("size")),
|
||||
flask.request.stream,
|
||||
source_snapshot=reqargs.get("source_snapshot"),
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
|
||||
|
||||
|
||||
# /vm/autobackup
|
||||
class API_VM_Autobackup_Root(Resource):
|
||||
@RequestParser(
|
||||
|
@ -3601,14 +3815,11 @@ class API_VM_Autobackup_Root(Resource):
|
|||
type: string
|
||||
example: "user@domain.tld"
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
task_id:
|
||||
type: string
|
||||
description: Task ID for the provisioner Celery worker
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -5058,7 +5269,95 @@ class API_Storage_Ceph_Benchmark(Resource):
|
|||
description: The PVC benchmark format of the results
|
||||
benchmark_result:
|
||||
type: object
|
||||
description: A benchmark test result; format not documented due to complexity
|
||||
description: A format 0 test result
|
||||
properties:
|
||||
test_name:
|
||||
type: object
|
||||
properties:
|
||||
overall:
|
||||
type: object
|
||||
properties:
|
||||
iosize:
|
||||
type: string (integer)
|
||||
description: The total size of the benchmark data
|
||||
bandwidth:
|
||||
type: string (integer)
|
||||
description: The average bandwidth (KiB/s)
|
||||
iops:
|
||||
type: string (integer)
|
||||
description: The average IOPS
|
||||
runtime:
|
||||
type: string (integer)
|
||||
description: The total test time in milliseconds
|
||||
latency:
|
||||
type: object
|
||||
properties:
|
||||
min:
|
||||
type: string (integer)
|
||||
description: The minimum latency measurement
|
||||
max:
|
||||
type: string (integer)
|
||||
description: The maximum latency measurement
|
||||
mean:
|
||||
type: string (float)
|
||||
description: The mean latency measurement
|
||||
stdev:
|
||||
type: string (float)
|
||||
description: The standard deviation of latency
|
||||
bandwidth:
|
||||
type: object
|
||||
properties:
|
||||
min:
|
||||
type: string (integer)
|
||||
description: The minimum bandwidth (KiB/s) measurement
|
||||
max:
|
||||
type: string (integer)
|
||||
description: The maximum bandwidth (KiB/s) measurement
|
||||
mean:
|
||||
type: string (float)
|
||||
description: The mean bandwidth (KiB/s) measurement
|
||||
stdev:
|
||||
type: string (float)
|
||||
description: The standard deviation of bandwidth
|
||||
numsamples:
|
||||
type: string (integer)
|
||||
description: The number of samples taken during the test
|
||||
iops:
|
||||
type: object
|
||||
properties:
|
||||
min:
|
||||
type: string (integer)
|
||||
description: The minimum IOPS measurement
|
||||
max:
|
||||
type: string (integer)
|
||||
description: The maximum IOPS measurement
|
||||
mean:
|
||||
type: string (float)
|
||||
description: The mean IOPS measurement
|
||||
stdev:
|
||||
type: string (float)
|
||||
description: The standard deviation of IOPS
|
||||
numsamples:
|
||||
type: string (integer)
|
||||
description: The number of samples taken during the test
|
||||
cpu:
|
||||
type: object
|
||||
properties:
|
||||
user:
|
||||
type: string (float percentage)
|
||||
description: The percentage of test time spent in user space
|
||||
system:
|
||||
type: string (float percentage)
|
||||
description: The percentage of test time spent in system (kernel) space
|
||||
ctxsw:
|
||||
type: string (integer)
|
||||
description: The number of context switches during the test
|
||||
majfault:
|
||||
type: string (integer)
|
||||
description: The number of major page faults during the test
|
||||
minfault:
|
||||
type: string (integer)
|
||||
description: The number of minor page faults during the test
|
||||
"""
|
||||
return list_benchmarks(config, reqargs.get("job", None))
|
||||
|
||||
|
@ -5069,10 +5368,6 @@ class API_Storage_Ceph_Benchmark(Resource):
|
|||
"required": True,
|
||||
"helptext": "A valid pool must be specified.",
|
||||
},
|
||||
{
|
||||
"name": "name",
|
||||
"required": False,
|
||||
},
|
||||
]
|
||||
)
|
||||
@Authenticator
|
||||
|
@ -5088,17 +5383,17 @@ class API_Storage_Ceph_Benchmark(Resource):
|
|||
type: string
|
||||
required: true
|
||||
description: The PVC storage pool to benchmark
|
||||
- in: query
|
||||
name: name
|
||||
type: string
|
||||
required: false
|
||||
description: An optional override name for the job
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: string
|
||||
description: The Celery job ID of the benchmark (unused elsewhere)
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
# Verify that the pool is valid
|
||||
_list, code = api_helper.ceph_pool_list(
|
||||
|
@ -5110,10 +5405,7 @@ class API_Storage_Ceph_Benchmark(Resource):
|
|||
}, 400
|
||||
|
||||
task = run_celery_task(
|
||||
"storage.benchmark",
|
||||
pool=reqargs.get("pool", None),
|
||||
name=reqargs.get("name", None),
|
||||
run_on="primary",
|
||||
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
|
||||
)
|
||||
return (
|
||||
{
|
||||
|
@ -5225,11 +5517,11 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
|
|||
required: true
|
||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -5424,11 +5716,11 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
|||
required: false
|
||||
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -5539,11 +5831,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
|||
required: false
|
||||
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -5597,11 +5889,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
|||
required: true
|
||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
@ -5655,7 +5947,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
|||
- in: query
|
||||
name: force
|
||||
type: boolean
|
||||
required: flase
|
||||
required: false
|
||||
description: Force removal even if some step(s) fail
|
||||
- in: query
|
||||
name: yes-i-really-mean-it
|
||||
|
@ -5663,11 +5955,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
|||
required: true
|
||||
description: A confirmation string to ensure that the API consumer really means it
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
|
@ -6255,7 +6547,7 @@ class API_Storage_Ceph_Volume_Root(Resource):
|
|||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: flase
|
||||
default: false
|
||||
description: Force action if volume creation would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
|
@ -6332,7 +6624,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
|||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: flase
|
||||
default: false
|
||||
description: Force action if volume creation would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
|
@ -6384,7 +6676,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
|||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: flase
|
||||
default: false
|
||||
description: Force action if new volume size would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
|
@ -6509,7 +6801,7 @@ class API_Storage_Ceph_Volume_Element_Clone(Resource):
|
|||
name: force
|
||||
type: boolean
|
||||
required: false
|
||||
default: flase
|
||||
default: false
|
||||
description: Force action if clone volume size would violate 80% full soft cap on the pool
|
||||
responses:
|
||||
200:
|
||||
|
@ -9364,14 +9656,11 @@ class API_Provisioner_Create_Root(Resource):
|
|||
type: string
|
||||
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
202:
|
||||
description: Accepted
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
task_id:
|
||||
type: string
|
||||
description: Task ID for the provisioner Celery worker
|
||||
type: string
|
||||
description: The Celery job ID of the task
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
|
|
|
@ -1280,7 +1280,7 @@ def vm_flush_locks(zkhandler, vm):
|
|||
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
|
||||
)
|
||||
|
||||
if retdata[0].get("state") not in ["stop", "disable"]:
|
||||
if retdata[0].get("state") not in ["stop", "disable", "mirror"]:
|
||||
return {"message": "VM must be stopped to flush locks"}, 400
|
||||
|
||||
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
|
||||
|
@ -1294,6 +1294,58 @@ def vm_flush_locks(zkhandler, vm):
|
|||
return output, retcode
|
||||
|
||||
|
||||
def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snapshot=None):
|
||||
try:
|
||||
import rados
|
||||
import rbd
|
||||
|
||||
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
|
||||
if not source_snapshot:
|
||||
rbd_inst = rbd.RBD()
|
||||
rbd_inst.create(ioctx, volume, size)
|
||||
|
||||
image = rbd.Image(ioctx, volume)
|
||||
|
||||
last_chunk = 0
|
||||
chunk_size = 1024 * 1024 * 128
|
||||
|
||||
if source_snapshot:
|
||||
# Receiving diff data
|
||||
print(f"Applying diff between {source_snapshot} and {snapshot}")
|
||||
while True:
|
||||
chunk = stream.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
# Extract the offset and length (8 bytes each) and the data
|
||||
offset = int.from_bytes(chunk[:8], 'big')
|
||||
length = int.from_bytes(chunk[8:16], 'big')
|
||||
data = chunk[16:16 + length]
|
||||
image.write(data, offset)
|
||||
|
||||
image.create_snap(snapshot)
|
||||
else:
|
||||
# Receiving full image
|
||||
print(f"Importing full snapshot {snapshot}")
|
||||
while True:
|
||||
chunk = flask.request.stream.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
image.write(chunk, last_chunk)
|
||||
last_chunk += len(chunk)
|
||||
|
||||
image.create_snap(snapshot)
|
||||
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
except Exception as e:
|
||||
return {"message": f"Failed to import block device: {e}"}, 400
|
||||
|
||||
|
||||
#
|
||||
# Network functions
|
||||
#
|
||||
|
|
|
@ -3755,13 +3755,6 @@ def cli_storage_benchmark():
|
|||
@click.command(name="run", short_help="Run a storage benchmark.")
|
||||
@connection_req
|
||||
@click.argument("pool")
|
||||
@click.option(
|
||||
"--name",
|
||||
"name",
|
||||
default=None,
|
||||
show_default=False,
|
||||
help="Use a custom name for the job",
|
||||
)
|
||||
@click.option(
|
||||
"--wait/--no-wait",
|
||||
"wait_flag",
|
||||
|
@ -3773,14 +3766,12 @@ def cli_storage_benchmark():
|
|||
@confirm_opt(
|
||||
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
|
||||
)
|
||||
def cli_storage_benchmark_run(pool, name, wait_flag):
|
||||
def cli_storage_benchmark_run(pool, wait_flag):
|
||||
"""
|
||||
Run a storage benchmark on POOL in the background.
|
||||
"""
|
||||
|
||||
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(
|
||||
CLI_CONFIG, pool, name, wait_flag
|
||||
)
|
||||
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag)
|
||||
|
||||
if retcode and wait_flag:
|
||||
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
|
||||
|
|
|
@ -206,12 +206,12 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
|
|||
|
||||
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
|
||||
|
||||
vm_states = ["start", "disable"]
|
||||
vm_states = ["start", "disable", "mirror"]
|
||||
vm_states.extend(
|
||||
[
|
||||
state
|
||||
for state in data.get("vms", {}).keys()
|
||||
if state not in ["total", "start", "disable"]
|
||||
if state not in ["total", "start", "disable", "mirror"]
|
||||
]
|
||||
)
|
||||
|
||||
|
@ -221,7 +221,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
|
|||
continue
|
||||
if state in ["start"]:
|
||||
state_colour = ansii["green"]
|
||||
elif state in ["migrate", "disable", "provision"]:
|
||||
elif state in ["migrate", "disable", "provision", "mirror"]:
|
||||
state_colour = ansii["blue"]
|
||||
elif state in ["stop", "fail"]:
|
||||
state_colour = ansii["red"]
|
||||
|
|
|
@ -30,7 +30,6 @@ from requests_toolbelt.multipart.encoder import (
|
|||
|
||||
import pvc.lib.ansiprint as ansiprint
|
||||
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
|
||||
from pvc.cli.helpers import MAX_CONTENT_WIDTH
|
||||
|
||||
#
|
||||
# Supplemental functions
|
||||
|
@ -1725,17 +1724,15 @@ def format_list_snapshot(config, snapshot_list):
|
|||
#
|
||||
# Benchmark functions
|
||||
#
|
||||
def ceph_benchmark_run(config, pool, name, wait_flag):
|
||||
def ceph_benchmark_run(config, pool, wait_flag):
|
||||
"""
|
||||
Run a storage benchmark against {pool}
|
||||
|
||||
API endpoint: POST /api/v1/storage/ceph/benchmark
|
||||
API arguments: pool={pool}, name={name}
|
||||
API arguments: pool={pool}
|
||||
API schema: {message}
|
||||
"""
|
||||
params = {"pool": pool}
|
||||
if name:
|
||||
params["name"] = name
|
||||
response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
|
||||
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
@ -1807,7 +1804,7 @@ def get_benchmark_list_results(benchmark_format, benchmark_data):
|
|||
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy(
|
||||
benchmark_data
|
||||
)
|
||||
elif benchmark_format == 1 or benchmark_format == 2:
|
||||
elif benchmark_format == 1:
|
||||
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json(
|
||||
benchmark_data
|
||||
)
|
||||
|
@ -2009,7 +2006,6 @@ def format_info_benchmark(config, benchmark_information):
|
|||
benchmark_matrix = {
|
||||
0: format_info_benchmark_legacy,
|
||||
1: format_info_benchmark_json,
|
||||
2: format_info_benchmark_json,
|
||||
}
|
||||
|
||||
benchmark_version = benchmark_information[0]["test_format"]
|
||||
|
@ -2344,15 +2340,12 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
if benchmark_information["benchmark_result"] == "Running":
|
||||
return "Benchmark test is still running."
|
||||
|
||||
benchmark_format = benchmark_information["test_format"]
|
||||
benchmark_details = benchmark_information["benchmark_result"]
|
||||
|
||||
# Format a nice output; do this line-by-line then concat the elements at the end
|
||||
ainformation = []
|
||||
ainformation.append(
|
||||
"{}Storage Benchmark details (format {}):{}".format(
|
||||
ansiprint.bold(), benchmark_format, ansiprint.end()
|
||||
)
|
||||
"{}Storage Benchmark details:{}".format(ansiprint.bold(), ansiprint.end())
|
||||
)
|
||||
|
||||
nice_test_name_map = {
|
||||
|
@ -2400,7 +2393,7 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
if element[1] != 0:
|
||||
useful_latency_tree.append(element)
|
||||
|
||||
max_rows = 5
|
||||
max_rows = 9
|
||||
if len(useful_latency_tree) > 9:
|
||||
max_rows = len(useful_latency_tree)
|
||||
elif len(useful_latency_tree) < 9:
|
||||
|
@ -2409,10 +2402,15 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
|
||||
# Format the static data
|
||||
overall_label = [
|
||||
"BW/s:",
|
||||
"IOPS:",
|
||||
"I/O:",
|
||||
"Time:",
|
||||
"Overall BW/s:",
|
||||
"Overall IOPS:",
|
||||
"Total I/O:",
|
||||
"Runtime (s):",
|
||||
"User CPU %:",
|
||||
"System CPU %:",
|
||||
"Ctx Switches:",
|
||||
"Major Faults:",
|
||||
"Minor Faults:",
|
||||
]
|
||||
while len(overall_label) < max_rows:
|
||||
overall_label.append("")
|
||||
|
@ -2421,149 +2419,68 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])),
|
||||
format_ops_tohuman(int(job_details[io_class]["iops"])),
|
||||
format_bytes_tohuman(int(job_details[io_class]["io_bytes"])),
|
||||
str(job_details["job_runtime"] / 1000) + "s",
|
||||
job_details["job_runtime"] / 1000,
|
||||
job_details["usr_cpu"],
|
||||
job_details["sys_cpu"],
|
||||
job_details["ctx"],
|
||||
job_details["majf"],
|
||||
job_details["minf"],
|
||||
]
|
||||
while len(overall_data) < max_rows:
|
||||
overall_data.append("")
|
||||
|
||||
cpu_label = [
|
||||
"Total:",
|
||||
"User:",
|
||||
"Sys:",
|
||||
"OSD:",
|
||||
"MON:",
|
||||
]
|
||||
while len(cpu_label) < max_rows:
|
||||
cpu_label.append("")
|
||||
|
||||
cpu_data = [
|
||||
(
|
||||
benchmark_details[test]["avg_cpu_util_percent"]["total"]
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
round(job_details["usr_cpu"], 2),
|
||||
round(job_details["sys_cpu"], 2),
|
||||
(
|
||||
benchmark_details[test]["avg_cpu_util_percent"]["ceph-osd"]
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
(
|
||||
benchmark_details[test]["avg_cpu_util_percent"]["ceph-mon"]
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
]
|
||||
while len(cpu_data) < max_rows:
|
||||
cpu_data.append("")
|
||||
|
||||
memory_label = [
|
||||
"Total:",
|
||||
"OSD:",
|
||||
"MON:",
|
||||
]
|
||||
while len(memory_label) < max_rows:
|
||||
memory_label.append("")
|
||||
|
||||
memory_data = [
|
||||
(
|
||||
benchmark_details[test]["avg_memory_util_percent"]["total"]
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
(
|
||||
benchmark_details[test]["avg_memory_util_percent"]["ceph-osd"]
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
(
|
||||
benchmark_details[test]["avg_memory_util_percent"]["ceph-mon"]
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
]
|
||||
while len(memory_data) < max_rows:
|
||||
memory_data.append("")
|
||||
|
||||
network_label = [
|
||||
"Total:",
|
||||
"Sent:",
|
||||
"Recv:",
|
||||
]
|
||||
while len(network_label) < max_rows:
|
||||
network_label.append("")
|
||||
|
||||
network_data = [
|
||||
(
|
||||
format_bytes_tohuman(
|
||||
int(benchmark_details[test]["avg_network_util_bps"]["total"])
|
||||
)
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
(
|
||||
format_bytes_tohuman(
|
||||
int(benchmark_details[test]["avg_network_util_bps"]["sent"])
|
||||
)
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
(
|
||||
format_bytes_tohuman(
|
||||
int(benchmark_details[test]["avg_network_util_bps"]["recv"])
|
||||
)
|
||||
if benchmark_format > 1
|
||||
else "N/A"
|
||||
),
|
||||
]
|
||||
while len(network_data) < max_rows:
|
||||
network_data.append("")
|
||||
|
||||
bandwidth_label = [
|
||||
"Min:",
|
||||
"Max:",
|
||||
"Mean:",
|
||||
"StdDev:",
|
||||
"Samples:",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
while len(bandwidth_label) < max_rows:
|
||||
bandwidth_label.append("")
|
||||
|
||||
bandwidth_data = [
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024)
|
||||
+ " / "
|
||||
+ format_ops_tohuman(int(job_details[io_class]["iops_min"])),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024)
|
||||
+ " / "
|
||||
+ format_ops_tohuman(int(job_details[io_class]["iops_max"])),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024)
|
||||
+ " / "
|
||||
+ format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024)
|
||||
+ " / "
|
||||
+ format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
|
||||
str(job_details[io_class]["bw_samples"])
|
||||
+ " / "
|
||||
+ str(job_details[io_class]["iops_samples"]),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024),
|
||||
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024),
|
||||
job_details[io_class]["bw_samples"],
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
while len(bandwidth_data) < max_rows:
|
||||
bandwidth_data.append("")
|
||||
|
||||
lat_label = [
|
||||
"Min:",
|
||||
"Max:",
|
||||
"Mean:",
|
||||
"StdDev:",
|
||||
iops_data = [
|
||||
format_ops_tohuman(int(job_details[io_class]["iops_min"])),
|
||||
format_ops_tohuman(int(job_details[io_class]["iops_max"])),
|
||||
format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
|
||||
format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
|
||||
job_details[io_class]["iops_samples"],
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
while len(lat_label) < max_rows:
|
||||
lat_label.append("")
|
||||
while len(iops_data) < max_rows:
|
||||
iops_data.append("")
|
||||
|
||||
lat_data = [
|
||||
int(job_details[io_class]["lat_ns"]["min"]) / 1000,
|
||||
int(job_details[io_class]["lat_ns"]["max"]) / 1000,
|
||||
int(job_details[io_class]["lat_ns"]["mean"]) / 1000,
|
||||
int(job_details[io_class]["lat_ns"]["stddev"]) / 1000,
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
while len(lat_data) < max_rows:
|
||||
lat_data.append("")
|
||||
|
@ -2572,119 +2489,98 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
lat_bucket_label = list()
|
||||
lat_bucket_data = list()
|
||||
for element in useful_latency_tree:
|
||||
lat_bucket_label.append(element[0] + ":" if element[0] else "")
|
||||
lat_bucket_data.append(round(float(element[1]), 2) if element[1] else "")
|
||||
while len(lat_bucket_label) < max_rows:
|
||||
lat_bucket_label.append("")
|
||||
while len(lat_bucket_data) < max_rows:
|
||||
lat_bucket_label.append("")
|
||||
lat_bucket_label.append(element[0])
|
||||
lat_bucket_data.append(element[1])
|
||||
|
||||
# Column default widths
|
||||
overall_label_length = 5
|
||||
overall_label_length = 0
|
||||
overall_column_length = 0
|
||||
cpu_label_length = 6
|
||||
cpu_column_length = 0
|
||||
memory_label_length = 6
|
||||
memory_column_length = 0
|
||||
network_label_length = 6
|
||||
network_column_length = 6
|
||||
bandwidth_label_length = 8
|
||||
bandwidth_column_length = 0
|
||||
latency_label_length = 7
|
||||
latency_column_length = 0
|
||||
bandwidth_label_length = 0
|
||||
bandwidth_column_length = 11
|
||||
iops_column_length = 4
|
||||
latency_column_length = 12
|
||||
latency_bucket_label_length = 0
|
||||
latency_bucket_column_length = 0
|
||||
|
||||
# Column layout:
|
||||
# Overall CPU Memory Network Bandwidth/IOPS Latency Percentiles
|
||||
# --------- ----- ------- -------- -------------- -------- ---------------
|
||||
# BW Total Total Total Min Min A
|
||||
# IOPS Usr OSD Send Max Max B
|
||||
# Time Sys MON Recv Mean Mean ...
|
||||
# Size OSD StdDev StdDev Z
|
||||
# MON Samples
|
||||
# General Bandwidth IOPS Latency Percentiles
|
||||
# --------- ---------- -------- -------- ---------------
|
||||
# Size Min Min Min A
|
||||
# BW Max Max Max B
|
||||
# IOPS Mean Mean Mean ...
|
||||
# Runtime StdDev StdDev StdDev Z
|
||||
# UsrCPU Samples Samples
|
||||
# SysCPU
|
||||
# CtxSw
|
||||
# MajFault
|
||||
# MinFault
|
||||
|
||||
# Set column widths
|
||||
for item in overall_label:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > overall_label_length:
|
||||
overall_label_length = _item_length
|
||||
|
||||
for item in overall_data:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > overall_column_length:
|
||||
overall_column_length = _item_length
|
||||
|
||||
for item in cpu_data:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > cpu_column_length:
|
||||
cpu_column_length = _item_length
|
||||
test_name_length = len(nice_test_name_map[test])
|
||||
if test_name_length > overall_label_length + overall_column_length:
|
||||
_diff = test_name_length - (overall_label_length + overall_column_length)
|
||||
overall_column_length += _diff
|
||||
|
||||
for item in memory_data:
|
||||
for item in bandwidth_label:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > memory_column_length:
|
||||
memory_column_length = _item_length
|
||||
|
||||
for item in network_data:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > network_column_length:
|
||||
network_column_length = _item_length
|
||||
if _item_length > bandwidth_label_length:
|
||||
bandwidth_label_length = _item_length
|
||||
|
||||
for item in bandwidth_data:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > bandwidth_column_length:
|
||||
bandwidth_column_length = _item_length
|
||||
|
||||
for item in iops_data:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > iops_column_length:
|
||||
iops_column_length = _item_length
|
||||
|
||||
for item in lat_data:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > latency_column_length:
|
||||
latency_column_length = _item_length
|
||||
|
||||
for item in lat_bucket_data:
|
||||
for item in lat_bucket_label:
|
||||
_item_length = len(str(item))
|
||||
if _item_length > latency_bucket_column_length:
|
||||
latency_bucket_column_length = _item_length
|
||||
if _item_length > latency_bucket_label_length:
|
||||
latency_bucket_label_length = _item_length
|
||||
|
||||
# Top row (Headers)
|
||||
ainformation.append(
|
||||
"{bold}{overall_label: <{overall_label_length}} {header_fill}{end_bold}".format(
|
||||
"{bold}\
|
||||
{overall_label: <{overall_label_length}} \
|
||||
{bandwidth_label: <{bandwidth_label_length}} \
|
||||
{bandwidth: <{bandwidth_length}} \
|
||||
{iops: <{iops_length}} \
|
||||
{latency: <{latency_length}} \
|
||||
{latency_bucket_label: <{latency_bucket_label_length}} \
|
||||
{latency_bucket} \
|
||||
{end_bold}".format(
|
||||
bold=ansiprint.bold(),
|
||||
end_bold=ansiprint.end(),
|
||||
overall_label=nice_test_name_map[test],
|
||||
overall_label_length=overall_label_length,
|
||||
header_fill="-"
|
||||
* (
|
||||
(MAX_CONTENT_WIDTH if MAX_CONTENT_WIDTH <= 120 else 120)
|
||||
- len(nice_test_name_map[test])
|
||||
- 4
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
ainformation.append(
|
||||
"{bold}\
|
||||
{overall_label: <{overall_label_length}} \
|
||||
{cpu_label: <{cpu_label_length}} \
|
||||
{memory_label: <{memory_label_length}} \
|
||||
{network_label: <{network_label_length}} \
|
||||
{bandwidth_label: <{bandwidth_label_length}} \
|
||||
{latency_label: <{latency_label_length}} \
|
||||
{latency_bucket_label: <{latency_bucket_label_length}}\
|
||||
{end_bold}".format(
|
||||
bold=ansiprint.bold(),
|
||||
end_bold=ansiprint.end(),
|
||||
overall_label="Overall",
|
||||
overall_label_length=overall_label_length + overall_column_length + 1,
|
||||
cpu_label="CPU (%)",
|
||||
cpu_label_length=cpu_label_length + cpu_column_length + 1,
|
||||
memory_label="Memory (%)",
|
||||
memory_label_length=memory_label_length + memory_column_length + 1,
|
||||
network_label="Network (bps)",
|
||||
network_label_length=network_label_length + network_column_length + 1,
|
||||
bandwidth_label="Bandwidth / IOPS",
|
||||
bandwidth_label_length=bandwidth_label_length
|
||||
+ bandwidth_column_length
|
||||
+ 1,
|
||||
latency_label="Latency (μs)",
|
||||
latency_label_length=latency_label_length + latency_column_length + 1,
|
||||
latency_bucket_label="Buckets (μs/%)",
|
||||
latency_bucket_label_length=latency_bucket_label_length
|
||||
+ latency_bucket_column_length,
|
||||
bandwidth_label="",
|
||||
bandwidth_label_length=bandwidth_label_length,
|
||||
bandwidth="Bandwidth/s",
|
||||
bandwidth_length=bandwidth_column_length,
|
||||
iops="IOPS",
|
||||
iops_length=iops_column_length,
|
||||
latency="Latency (μs)",
|
||||
latency_length=latency_column_length,
|
||||
latency_bucket_label="Latency Buckets (μs/%)",
|
||||
latency_bucket_label_length=latency_bucket_label_length,
|
||||
latency_bucket="",
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -2692,20 +2588,14 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
# Top row (Headers)
|
||||
ainformation.append(
|
||||
"{bold}\
|
||||
{overall_label: <{overall_label_length}} \
|
||||
{overall: <{overall_length}} \
|
||||
{cpu_label: <{cpu_label_length}} \
|
||||
{cpu: <{cpu_length}} \
|
||||
{memory_label: <{memory_label_length}} \
|
||||
{memory: <{memory_length}} \
|
||||
{network_label: <{network_label_length}} \
|
||||
{network: <{network_length}} \
|
||||
{bandwidth_label: <{bandwidth_label_length}} \
|
||||
{bandwidth: <{bandwidth_length}} \
|
||||
{latency_label: <{latency_label_length}} \
|
||||
{latency: <{latency_length}} \
|
||||
{latency_bucket_label: <{latency_bucket_label_length}} \
|
||||
{latency_bucket}\
|
||||
{overall_label: >{overall_label_length}} \
|
||||
{overall: <{overall_length}} \
|
||||
{bandwidth_label: >{bandwidth_label_length}} \
|
||||
{bandwidth: <{bandwidth_length}} \
|
||||
{iops: <{iops_length}} \
|
||||
{latency: <{latency_length}} \
|
||||
{latency_bucket_label: >{latency_bucket_label_length}} \
|
||||
{latency_bucket} \
|
||||
{end_bold}".format(
|
||||
bold="",
|
||||
end_bold="",
|
||||
|
@ -2713,24 +2603,12 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
overall_label_length=overall_label_length,
|
||||
overall=overall_data[idx],
|
||||
overall_length=overall_column_length,
|
||||
cpu_label=cpu_label[idx],
|
||||
cpu_label_length=cpu_label_length,
|
||||
cpu=cpu_data[idx],
|
||||
cpu_length=cpu_column_length,
|
||||
memory_label=memory_label[idx],
|
||||
memory_label_length=memory_label_length,
|
||||
memory=memory_data[idx],
|
||||
memory_length=memory_column_length,
|
||||
network_label=network_label[idx],
|
||||
network_label_length=network_label_length,
|
||||
network=network_data[idx],
|
||||
network_length=network_column_length,
|
||||
bandwidth_label=bandwidth_label[idx],
|
||||
bandwidth_label_length=bandwidth_label_length,
|
||||
bandwidth=bandwidth_data[idx],
|
||||
bandwidth_length=bandwidth_column_length,
|
||||
latency_label=lat_label[idx],
|
||||
latency_label_length=latency_label_length,
|
||||
iops=iops_data[idx],
|
||||
iops_length=iops_column_length,
|
||||
latency=lat_data[idx],
|
||||
latency_length=latency_column_length,
|
||||
latency_bucket_label=lat_bucket_label[idx],
|
||||
|
@ -2739,4 +2617,4 @@ def format_info_benchmark_json(config, benchmark_information):
|
|||
)
|
||||
)
|
||||
|
||||
return "\n".join(ainformation) + "\n"
|
||||
return "\n".join(ainformation)
|
||||
|
|
|
@ -1760,6 +1760,7 @@ def format_info(config, domain_information, long_output):
|
|||
"provision": ansiprint.blue(),
|
||||
"restore": ansiprint.blue(),
|
||||
"import": ansiprint.blue(),
|
||||
"mirror": ansiprint.blue(),
|
||||
}
|
||||
ainformation.append(
|
||||
"{}State:{} {}{}{}".format(
|
||||
|
|
|
@ -19,34 +19,31 @@
|
|||
#
|
||||
###############################################################################
|
||||
|
||||
import os
|
||||
import psutil
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import subprocess
|
||||
|
||||
from datetime import datetime
|
||||
from json import loads, dumps
|
||||
from time import sleep
|
||||
|
||||
from daemon_lib.celery import start, fail, log_info, update, finish
|
||||
|
||||
import daemon_lib.common as pvc_common
|
||||
import daemon_lib.ceph as pvc_ceph
|
||||
|
||||
|
||||
# Define the current test format
|
||||
TEST_FORMAT = 2
|
||||
TEST_FORMAT = 1
|
||||
|
||||
|
||||
# We run a total of 8 tests, to give a generalized idea of performance on the cluster:
|
||||
# 1. A sequential read test of 64GB with a 4M block size
|
||||
# 2. A sequential write test of 64GB with a 4M block size
|
||||
# 3. A random read test of 64GB with a 4M block size
|
||||
# 4. A random write test of 64GB with a 4M block size
|
||||
# 5. A random read test of 64GB with a 256k block size
|
||||
# 6. A random write test of 64GB with a 256k block size
|
||||
# 7. A random read test of 64GB with a 4k block size
|
||||
# 8. A random write test of 64GB with a 4k block size
|
||||
# 1. A sequential read test of 8GB with a 4M block size
|
||||
# 2. A sequential write test of 8GB with a 4M block size
|
||||
# 3. A random read test of 8GB with a 4M block size
|
||||
# 4. A random write test of 8GB with a 4M block size
|
||||
# 5. A random read test of 8GB with a 256k block size
|
||||
# 6. A random write test of 8GB with a 256k block size
|
||||
# 7. A random read test of 8GB with a 4k block size
|
||||
# 8. A random write test of 8GB with a 4k block size
|
||||
# Taken together, these 8 results should give a very good indication of the overall storage performance
|
||||
# for a variety of workloads.
|
||||
test_matrix = {
|
||||
|
@ -103,7 +100,7 @@ test_matrix = {
|
|||
|
||||
# Specify the benchmark volume name and size
|
||||
benchmark_volume_name = "pvcbenchmark"
|
||||
benchmark_volume_size = "64G"
|
||||
benchmark_volume_size = "8G"
|
||||
|
||||
|
||||
#
|
||||
|
@ -229,7 +226,7 @@ def cleanup_benchmark_volume(
|
|||
|
||||
|
||||
def run_benchmark_job(
|
||||
config, test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
|
||||
test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
|
||||
):
|
||||
test_spec = test_matrix[test]
|
||||
log_info(None, f"Running test '{test}'")
|
||||
|
@ -259,165 +256,31 @@ def run_benchmark_job(
|
|||
)
|
||||
|
||||
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
|
||||
|
||||
# Run the fio command manually instead of using our run_os_command wrapper
|
||||
# This will help us gather statistics about this node while it's running
|
||||
process = subprocess.Popen(
|
||||
fio_cmd.split(),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
|
||||
# Wait 15 seconds for the test to start
|
||||
log_info(None, "Waiting 15 seconds for test resource stabilization")
|
||||
sleep(15)
|
||||
|
||||
# Set up function to get process CPU utilization by name
|
||||
def get_cpu_utilization_by_name(process_name):
|
||||
cpu_usage = 0
|
||||
for proc in psutil.process_iter(["name", "cpu_percent"]):
|
||||
if proc.info["name"] == process_name:
|
||||
cpu_usage += proc.info["cpu_percent"]
|
||||
return cpu_usage
|
||||
|
||||
# Set up function to get process memory utilization by name
|
||||
def get_memory_utilization_by_name(process_name):
|
||||
memory_usage = 0
|
||||
for proc in psutil.process_iter(["name", "memory_percent"]):
|
||||
if proc.info["name"] == process_name:
|
||||
memory_usage += proc.info["memory_percent"]
|
||||
return memory_usage
|
||||
|
||||
# Set up function to get network traffic utilization in bps
|
||||
def get_network_traffic_bps(interface, duration=1):
|
||||
# Get initial network counters
|
||||
net_io_start = psutil.net_io_counters(pernic=True)
|
||||
if interface not in net_io_start:
|
||||
return None, None
|
||||
|
||||
stats_start = net_io_start[interface]
|
||||
bytes_sent_start = stats_start.bytes_sent
|
||||
bytes_recv_start = stats_start.bytes_recv
|
||||
|
||||
# Wait for the specified duration
|
||||
sleep(duration)
|
||||
|
||||
# Get final network counters
|
||||
net_io_end = psutil.net_io_counters(pernic=True)
|
||||
stats_end = net_io_end[interface]
|
||||
bytes_sent_end = stats_end.bytes_sent
|
||||
bytes_recv_end = stats_end.bytes_recv
|
||||
|
||||
# Calculate bytes per second
|
||||
bytes_sent_per_sec = (bytes_sent_end - bytes_sent_start) / duration
|
||||
bytes_recv_per_sec = (bytes_recv_end - bytes_recv_start) / duration
|
||||
|
||||
# Convert to bits per second (bps)
|
||||
bits_sent_per_sec = bytes_sent_per_sec * 8
|
||||
bits_recv_per_sec = bytes_recv_per_sec * 8
|
||||
bits_total_per_sec = bits_sent_per_sec + bits_recv_per_sec
|
||||
|
||||
return bits_sent_per_sec, bits_recv_per_sec, bits_total_per_sec
|
||||
|
||||
log_info(None, f"Starting system resource polling for test '{test}'")
|
||||
storage_interface = config["storage_dev"]
|
||||
total_cpus = psutil.cpu_count(logical=True)
|
||||
ticks = 1
|
||||
osd_cpu_utilization = 0
|
||||
osd_memory_utilization = 0
|
||||
mon_cpu_utilization = 0
|
||||
mon_memory_utilization = 0
|
||||
total_cpu_utilization = 0
|
||||
total_memory_utilization = 0
|
||||
storage_sent_bps = 0
|
||||
storage_recv_bps = 0
|
||||
storage_total_bps = 0
|
||||
|
||||
while process.poll() is None:
|
||||
# Do collection of statistics like network bandwidth and cpu utilization
|
||||
current_osd_cpu_utilization = get_cpu_utilization_by_name("ceph-osd")
|
||||
current_osd_memory_utilization = get_memory_utilization_by_name("ceph-osd")
|
||||
current_mon_cpu_utilization = get_cpu_utilization_by_name("ceph-mon")
|
||||
current_mon_memory_utilization = get_memory_utilization_by_name("ceph-mon")
|
||||
current_total_cpu_utilization = psutil.cpu_percent(interval=1)
|
||||
current_total_memory_utilization = psutil.virtual_memory().percent
|
||||
(
|
||||
current_storage_sent_bps,
|
||||
current_storage_recv_bps,
|
||||
current_storage_total_bps,
|
||||
) = get_network_traffic_bps(storage_interface)
|
||||
# Recheck if the process is done yet; if it's not, we add the values and increase the ticks
|
||||
# This helps ensure that if the process finishes earlier than the longer polls above,
|
||||
# this particular tick isn't counted which can skew the average
|
||||
if process.poll() is None:
|
||||
osd_cpu_utilization += current_osd_cpu_utilization
|
||||
osd_memory_utilization += current_osd_memory_utilization
|
||||
mon_cpu_utilization += current_mon_cpu_utilization
|
||||
mon_memory_utilization += current_mon_memory_utilization
|
||||
total_cpu_utilization += current_total_cpu_utilization
|
||||
total_memory_utilization += current_total_memory_utilization
|
||||
storage_sent_bps += current_storage_sent_bps
|
||||
storage_recv_bps += current_storage_recv_bps
|
||||
storage_total_bps += current_storage_total_bps
|
||||
ticks += 1
|
||||
|
||||
# Get the 1-minute load average and CPU utilization, which covers the test duration
|
||||
load1, _, _ = os.getloadavg()
|
||||
load1 = round(load1, 2)
|
||||
|
||||
# Calculate the average CPU utilization values over the runtime
|
||||
# Divide the OSD and MON CPU utilization by the total number of CPU cores, because
|
||||
# the total is divided this way
|
||||
avg_osd_cpu_utilization = round(osd_cpu_utilization / ticks / total_cpus, 2)
|
||||
avg_osd_memory_utilization = round(osd_memory_utilization / ticks, 2)
|
||||
avg_mon_cpu_utilization = round(mon_cpu_utilization / ticks / total_cpus, 2)
|
||||
avg_mon_memory_utilization = round(mon_memory_utilization / ticks, 2)
|
||||
avg_total_cpu_utilization = round(total_cpu_utilization / ticks, 2)
|
||||
avg_total_memory_utilization = round(total_memory_utilization / ticks, 2)
|
||||
avg_storage_sent_bps = round(storage_sent_bps / ticks, 2)
|
||||
avg_storage_recv_bps = round(storage_recv_bps / ticks, 2)
|
||||
avg_storage_total_bps = round(storage_total_bps / ticks, 2)
|
||||
|
||||
stdout, stderr = process.communicate()
|
||||
retcode = process.returncode
|
||||
|
||||
resource_data = {
|
||||
"avg_cpu_util_percent": {
|
||||
"total": avg_total_cpu_utilization,
|
||||
"ceph-mon": avg_mon_cpu_utilization,
|
||||
"ceph-osd": avg_osd_cpu_utilization,
|
||||
},
|
||||
"avg_memory_util_percent": {
|
||||
"total": avg_total_memory_utilization,
|
||||
"ceph-mon": avg_mon_memory_utilization,
|
||||
"ceph-osd": avg_osd_memory_utilization,
|
||||
},
|
||||
"avg_network_util_bps": {
|
||||
"sent": avg_storage_sent_bps,
|
||||
"recv": avg_storage_recv_bps,
|
||||
"total": avg_storage_total_bps,
|
||||
},
|
||||
}
|
||||
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
|
||||
try:
|
||||
jstdout = loads(stdout)
|
||||
if retcode:
|
||||
raise
|
||||
except Exception:
|
||||
return None, None
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
None,
|
||||
f"Failed to run fio test '{test}': {stderr}",
|
||||
)
|
||||
|
||||
return resource_data, jstdout
|
||||
return jstdout
|
||||
|
||||
|
||||
def worker_run_benchmark(zkhandler, celery, config, pool, name):
|
||||
def worker_run_benchmark(zkhandler, celery, config, pool):
|
||||
# Phase 0 - connect to databases
|
||||
if not name:
|
||||
cur_time = datetime.now().isoformat(timespec="seconds")
|
||||
cur_primary = zkhandler.read("base.config.primary_node")
|
||||
job_name = f"{cur_time}_{cur_primary}"
|
||||
else:
|
||||
job_name = name
|
||||
cur_time = datetime.now().isoformat(timespec="seconds")
|
||||
cur_primary = zkhandler.read("base.config.primary_node")
|
||||
job_name = f"{cur_time}_{cur_primary}"
|
||||
|
||||
current_stage = 0
|
||||
total_stages = 13
|
||||
|
@ -495,8 +358,7 @@ def worker_run_benchmark(zkhandler, celery, config, pool, name):
|
|||
total=total_stages,
|
||||
)
|
||||
|
||||
resource_data, fio_data = run_benchmark_job(
|
||||
config,
|
||||
results[test] = run_benchmark_job(
|
||||
test,
|
||||
pool,
|
||||
job_name=job_name,
|
||||
|
@ -504,25 +366,6 @@ def worker_run_benchmark(zkhandler, celery, config, pool, name):
|
|||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
if resource_data is None or fio_data is None:
|
||||
cleanup_benchmark_volume(
|
||||
pool,
|
||||
job_name=job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
None,
|
||||
f"Failed to run fio test '{test}'",
|
||||
)
|
||||
results[test] = {**resource_data, **fio_data}
|
||||
|
||||
# Phase 3 - cleanup
|
||||
current_stage += 1
|
||||
|
|
|
@ -1160,6 +1160,7 @@ def get_resource_metrics(zkhandler):
|
|||
"fail": 8,
|
||||
"import": 9,
|
||||
"restore": 10,
|
||||
"mirror": 99,
|
||||
}
|
||||
state = vm["state"]
|
||||
output_lines.append(
|
||||
|
|
|
@ -85,6 +85,7 @@ vm_state_combinations = [
|
|||
"provision",
|
||||
"import",
|
||||
"restore",
|
||||
"mirror",
|
||||
]
|
||||
ceph_osd_state_combinations = [
|
||||
"up,in",
|
||||
|
|
|
@ -24,6 +24,7 @@ import re
|
|||
import os.path
|
||||
import lxml.objectify
|
||||
import lxml.etree
|
||||
import subprocess
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import datetime
|
||||
|
@ -580,7 +581,7 @@ def rename_vm(zkhandler, domain, new_domain):
|
|||
|
||||
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
||||
state = zkhandler.read(("domain.state", dom_uuid))
|
||||
if state not in ["stop", "disable"]:
|
||||
if state not in ["stop", "disable", "mirror"]:
|
||||
return (
|
||||
False,
|
||||
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
|
||||
|
@ -1125,6 +1126,7 @@ def get_list(
|
|||
"migrate",
|
||||
"unmigrate",
|
||||
"provision",
|
||||
"mirror",
|
||||
]
|
||||
if state not in valid_states:
|
||||
return False, 'VM state "{}" is not valid.'.format(state)
|
||||
|
@ -1903,10 +1905,10 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
|||
|
||||
# Check that the domain is stopped (unless force_unlock is set)
|
||||
domain_state = zkhandler.read(("domain.state", dom_uuid))
|
||||
if not force_unlock and domain_state not in ["stop", "disable", "fail"]:
|
||||
if not force_unlock and domain_state not in ["stop", "disable", "fail", "mirror"]:
|
||||
fail(
|
||||
celery,
|
||||
f"VM state {domain_state} not in [stop, disable, fail] and not forcing",
|
||||
f"VM state {domain_state} not in [stop, disable, fail, mirror] and not forcing",
|
||||
)
|
||||
return False
|
||||
|
||||
|
@ -2329,7 +2331,7 @@ def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name):
|
|||
|
||||
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
||||
state = zkhandler.read(("domain.state", dom_uuid))
|
||||
if state not in ["stop", "disable"]:
|
||||
if state not in ["stop", "disable", "mirror"]:
|
||||
fail(
|
||||
celery,
|
||||
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
|
||||
|
@ -3118,3 +3120,389 @@ def vm_worker_import_snapshot(
|
|||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
|
||||
def vm_worker_send_snapshot(
|
||||
zkhandler,
|
||||
celery,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=True,
|
||||
incremental_parent=None,
|
||||
destination_storage_pool=None,
|
||||
):
|
||||
import requests
|
||||
from packaging.version import parse as parse_version
|
||||
|
||||
current_stage = 0
|
||||
total_stages = 1
|
||||
start(
|
||||
celery,
|
||||
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Validate that VM exists in cluster
|
||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||
if not dom_uuid:
|
||||
fail(
|
||||
celery,
|
||||
f"Could not find VM '{domain}' in the cluster",
|
||||
)
|
||||
return False
|
||||
|
||||
# Get our side's VM configuration details
|
||||
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
|
||||
if not isinstance(vm_detail, dict):
|
||||
fail(celery, f"VM listing returned invalid data: {vm_detail}",)
|
||||
return False
|
||||
|
||||
# Check if the snapshot exists
|
||||
if not zkhandler.exists(
|
||||
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if the incremental parent exists
|
||||
if incremental_parent is not None and not zkhandler.exists(
|
||||
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||
)
|
||||
return False
|
||||
|
||||
# Validate that the destination cluster can be reached
|
||||
destination_api_timeout = (3.05, 172800)
|
||||
destination_api_headers = {
|
||||
"X-Api-Key": destination_api_key,
|
||||
"Content-Type": "application/octet-stream",
|
||||
}
|
||||
|
||||
try:
|
||||
# Hit the API root; this should return "PVC API version x"
|
||||
response = requests.get(
|
||||
f"{destination_api_uri}/",
|
||||
timeout=destination_api_timeout,
|
||||
headers=None,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
if "PVC API" not in response.json().get("message"):
|
||||
raise ValueError("Remote API is not a PVC API or incorrect URI given")
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Connection to remote API timed out: {e}",
|
||||
)
|
||||
return False
|
||||
except ValueError as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Connection to remote API is not valid: {e}",
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Connection to remote API failed: {e}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Hit the API "/status" endpoint to validate API key and cluster status
|
||||
response = requests.get(
|
||||
f"{destination_api_uri}/status",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
destination_cluster_status = response.json()
|
||||
current_destination_pvc_version = destination_cluster_status.get(
|
||||
"pvc_version", None
|
||||
)
|
||||
if current_destination_pvc_version is None:
|
||||
fail(
|
||||
celery,
|
||||
"Connection to remote API failed: no PVC version information returned",
|
||||
)
|
||||
return False
|
||||
|
||||
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
|
||||
if parse_version(current_destination_pvc_version) < parse_version(
|
||||
expected_destination_pvc_version
|
||||
):
|
||||
fail(
|
||||
celery,
|
||||
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if the VM already exists on the remote
|
||||
response = requests.get(
|
||||
f"{destination_api_uri}/vm/{domain}",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
destination_vm_status = response.json()
|
||||
current_destination_vm_state = destination_vm_status.get(
|
||||
"state", None
|
||||
)
|
||||
if current_destination_vm_state is not None and current_destination_vm_state != "mirror":
|
||||
fail(
|
||||
celery,
|
||||
f"Remote PVC VM exists and is not a mirror",
|
||||
)
|
||||
return False
|
||||
|
||||
# Get details about VM snapshot
|
||||
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
|
||||
[
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.name",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.timestamp",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.xml",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
(
|
||||
(
|
||||
"domain.snapshots",
|
||||
dom_uuid,
|
||||
"domain_snapshot.rbd_snapshots",
|
||||
snapshot_name,
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
|
||||
|
||||
# Get details about remote VM snapshots
|
||||
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
|
||||
latest_destination_vm_snapshot = destination_vm_snapshots[0]
|
||||
|
||||
# Check if this snapshot is in the remote list already
|
||||
if snapshot_name in [s['name'] for s in destination_vm_snapshots]:
|
||||
fail(
|
||||
celery,
|
||||
f"Snapshot {snapshot_name} already exists on the target",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if this snapshot is older than the latest remote VM snapshot
|
||||
if snapshot_timestamp < latest_destination_vm_snapshot["timestamp"]:
|
||||
fail(
|
||||
celery,
|
||||
f"Target has a newer snapshot ({latest_destination_vm_snapshot['name']}); cannot send old snapshot {snapshot_name}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Check that our incremental parent exists on the remote VM
|
||||
if incremental_parent is not None:
|
||||
if increment_parent not in [s['name'] for s in destination_vm_snapshots]:
|
||||
fail(
|
||||
celery,
|
||||
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
|
||||
)
|
||||
return False
|
||||
|
||||
# Set send type
|
||||
send_type = "incremental" if incremental_parent is not None else "full"
|
||||
|
||||
# Begin send, set stages
|
||||
total_stages = 2 + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
|
||||
|
||||
# Create the block devices on the remote side if this is a new VM send
|
||||
for rbd in [r for r in vm_detail["disks"] if r['type'] == 'rbd']:
|
||||
pool, volume = rbd["name"].split('/')
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Preparing remote volume for {rbd}@{snapshot_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Get the storage volume details
|
||||
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
|
||||
if not retcode or len(retdata) != 1:
|
||||
if len(retdata) < 1:
|
||||
error_message = f"No detail returned for volume {rbd}"
|
||||
elif len(retdata) > 1:
|
||||
error_message = f"Multiple details returned for volume {rbd}"
|
||||
else:
|
||||
error_message = f"Error getting details for volume {rbd}"
|
||||
fail(
|
||||
celery,
|
||||
error_message,
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||
except Exception as e:
|
||||
error_message = f"Failed to get volume size for {rbd}: {e}"
|
||||
|
||||
if destination_storage_pool is not None:
|
||||
pool = destination_storage_pool
|
||||
|
||||
if current_destination_vm_state is None:
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Creating remote volume {pool}/{volume} for {rbd}@{snapshot_name}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Check if the volume exists on the target
|
||||
response = requests.get(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=None,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
if response.status_code != 404:
|
||||
fail(
|
||||
celery,
|
||||
f"Remote storage pool {pool} already contains volume {volume}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Create the volume on the target
|
||||
params = {
|
||||
"size": size_bytes,
|
||||
}
|
||||
response = requests.post(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
timeout=destination_api_timeout,
|
||||
headers=destination_api_headers,
|
||||
params=params,
|
||||
data=None,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
destination_volume_create_status = response.json()
|
||||
if response.status_code != 200:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to create volume {pool}/{volume} on target: {destination_volume_create_status['message']}",
|
||||
)
|
||||
return False
|
||||
|
||||
# Send the volume to the remote
|
||||
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
|
||||
cluster.connect()
|
||||
ioctx = cluster.open_ioctx(pool)
|
||||
image = rbd.Image(ioctx, name=volume, snapshot=snapshot, read_only=True)
|
||||
size = image.size()
|
||||
chunk_size_mb = 128
|
||||
|
||||
if incremental_parent is not None:
|
||||
# Diff between incremental_parent and snapshot
|
||||
def diff_chunker():
|
||||
def diff_cb(offset, length, exists):
|
||||
""" Callback to handle diff regions """
|
||||
if exists:
|
||||
data = image.read(offset, length)
|
||||
yield (offset.to_bytes(8, 'big') + length.to_bytes(8, 'big') + data)
|
||||
|
||||
image.set_snap(incremental_parent)
|
||||
image.diff_iterate(0, size, source_snapshot, diff_cb)
|
||||
data_stream = diff_chunker()
|
||||
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd}")
|
||||
else:
|
||||
# Full image transfer
|
||||
def chunker():
|
||||
d_start = time.time()
|
||||
chunk_size = 1024 * 1024 * chunk_size_mb
|
||||
current_chunk = 0
|
||||
while current_chunk < size:
|
||||
t_end = time.time()
|
||||
t_tot = t_end - t_start
|
||||
chunk = image.read(current_chunk, chunk_size)
|
||||
yield chunk
|
||||
current_chunk += chunk_size
|
||||
data_stream = chunker()
|
||||
celery_message = f"Sending full image of {rbd}@{snapshot_name}"
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Sending volume {rbd}@{snapshot_name} to target ({send_type})",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
send_params = {
|
||||
'pool': pool,
|
||||
'volume': volume,
|
||||
'snapshot': snapshot_name,
|
||||
'size': size,
|
||||
'source_snapshot': incremental_parent,
|
||||
}
|
||||
send_headers = {
|
||||
'X-Api-Key': destination_api_key,,
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'Transfer-Encoding': None # Disable chunked transfer encoding
|
||||
}
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||
timeout=destination_api_timeout,
|
||||
headers=send_headers,
|
||||
params=send_params,
|
||||
data=data_stream,
|
||||
verify=destination_api_verify_ssl,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to send snapshot: {e}",
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
image.close()
|
||||
ioctx.close()
|
||||
cluster.shutdown()
|
||||
|
||||
# Send the VM configuration
|
||||
if current_destination_vm_state is None:
|
||||
# This is a new VM, so define it
|
||||
#response = requests.post()
|
||||
else:
|
||||
# This is a modification
|
||||
#response = requests.post()
|
||||
|
|
|
@ -33,6 +33,7 @@ from daemon_lib.vm import (
|
|||
vm_worker_rollback_snapshot,
|
||||
vm_worker_export_snapshot,
|
||||
vm_worker_import_snapshot,
|
||||
vm_worker_send_snapshot,
|
||||
)
|
||||
from daemon_lib.ceph import (
|
||||
osd_worker_add_osd,
|
||||
|
@ -96,12 +97,12 @@ def create_vm(
|
|||
|
||||
|
||||
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
|
||||
def storage_benchmark(self, pool=None, name=None, run_on="primary"):
|
||||
def storage_benchmark(self, pool=None, run_on="primary"):
|
||||
@ZKConnection(config)
|
||||
def run_storage_benchmark(zkhandler, self, pool, name):
|
||||
return worker_run_benchmark(zkhandler, self, config, pool, name)
|
||||
def run_storage_benchmark(zkhandler, self, pool):
|
||||
return worker_run_benchmark(zkhandler, self, config, pool)
|
||||
|
||||
return run_storage_benchmark(self, pool, name)
|
||||
return run_storage_benchmark(self, pool)
|
||||
|
||||
|
||||
@celery.task(name="cluster.autobackup", bind=True, routing_key="run_on")
|
||||
|
@ -227,6 +228,39 @@ def vm_import_snapshot(
|
|||
)
|
||||
|
||||
|
||||
@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on")
|
||||
def vm_send_snapshot(
|
||||
self,
|
||||
domain=None,
|
||||
snapshot_name=None,
|
||||
destination_api_uri="",
|
||||
destination_api_key="",
|
||||
destination_api_verify_ssl=True,
|
||||
incremental_parent=None,
|
||||
destination_storage_pool=None,
|
||||
run_on="primary",
|
||||
):
|
||||
@ZKConnection(config)
|
||||
def run_vm_send_snapshot(
|
||||
zkhandler, self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=True, incremental_parent=None, destination_storage_pool=None,
|
||||
):
|
||||
return vm_worker_send_snapshot(
|
||||
zkhandler,
|
||||
self,
|
||||
domain,
|
||||
snapshot_name,
|
||||
destination_api_uri,
|
||||
destination_api_key,
|
||||
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||
incremental_parent=incremental_parent,
|
||||
destination_storage_pool=destination_storage_pool,
|
||||
)
|
||||
|
||||
return run_vm_send_snapshot(
|
||||
self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, incremental_parent=incremental_parent, destination_storage_pool=destination_storage_pool
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
||||
def osd_add(
|
||||
self,
|
||||
|
|
Loading…
Reference in New Issue