Compare commits

..

5 Commits

13 changed files with 1029 additions and 561 deletions

View File

@ -26,6 +26,6 @@ from os import path
current_dir = path.dirname(path.abspath(__file__)) current_dir = path.dirname(path.abspath(__file__))
sys.path.append(current_dir) sys.path.append(current_dir)
import pvcapid.Daemon # noqa: F401, E402 import pvcapid.Daemon # noqa: F401
pvcapid.Daemon.entrypoint() pvcapid.Daemon.entrypoint()

View File

@ -19,6 +19,8 @@
# #
############################################################################### ###############################################################################
import sys
import os
import subprocess import subprocess
from ssl import SSLContext, TLSVersion from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
@ -35,7 +37,6 @@ API_VERSION = 1.0
# Helper Functions # Helper Functions
########################################################## ##########################################################
def strtobool(stringv): def strtobool(stringv):
if stringv is None: if stringv is None:
return False return False
@ -61,7 +62,6 @@ config["daemon_version"] = version
# Flask App Creation for Gunicorn # Flask App Creation for Gunicorn
########################################################## ##########################################################
def create_app(): def create_app():
""" """
Create and return the Flask app and SSL context if necessary. Create and return the Flask app and SSL context if necessary.
@ -101,9 +101,8 @@ def create_app():
# Entrypoint # Entrypoint
########################################################## ##########################################################
def entrypoint(): def entrypoint():
if config["debug"]: if config['debug']:
app = create_app() app = create_app()
if config["api_ssl_enabled"]: if config["api_ssl_enabled"]:
@ -125,30 +124,21 @@ def entrypoint():
else: else:
# Build the command to run Gunicorn # Build the command to run Gunicorn
gunicorn_cmd = [ gunicorn_cmd = [
"gunicorn", 'gunicorn',
"--workers", '--workers', '1',
"1", '--threads', '8',
"--threads", '--timeout', '86400',
"8", '--bind', '{}:{}'.format(config["api_listen_address"], config["api_listen_port"]),
"--timeout", 'pvcapid.Daemon:create_app()',
"86400", '--log-level', 'info',
"--bind", '--access-logfile', '-',
"{}:{}".format(config["api_listen_address"], config["api_listen_port"]), '--error-logfile', '-',
"pvcapid.Daemon:create_app()",
"--log-level",
"info",
"--access-logfile",
"-",
"--error-logfile",
"-",
] ]
if config["api_ssl_enabled"]: if config["api_ssl_enabled"]:
gunicorn_cmd += [ gunicorn_cmd += [
"--certfile", '--certfile', config["api_ssl_cert_file"],
config["api_ssl_cert_file"], '--keyfile', config["api_ssl_key_file"]
"--keyfile",
config["api_ssl_key_file"],
] ]
# Run Gunicorn # Run Gunicorn

View File

@ -2801,7 +2801,7 @@ class API_VM_Locks(Resource):
- vm - vm
responses: responses:
202: 202:
description: OK description: Accepted
schema: schema:
type: string type: string
description: The Celery job ID of the task description: The Celery job ID of the task
@ -2924,11 +2924,11 @@ class API_VM_Device(Resource):
required: true required: true
description: The raw Libvirt XML definition of the device to attach description: The raw Libvirt XML definition of the device to attach
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -2978,11 +2978,11 @@ class API_VM_Device(Resource):
required: true required: true
description: The raw Libvirt XML definition of the device to detach description: The raw Libvirt XML definition of the device to detach
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -3234,11 +3234,11 @@ class API_VM_Snapshot(Resource):
required: false required: false
description: A custom name for the snapshot instead of autogeneration by date description: A custom name for the snapshot instead of autogeneration by date
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3292,11 +3292,11 @@ class API_VM_Snapshot(Resource):
required: true required: true
description: The name of the snapshot to remove description: The name of the snapshot to remove
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3356,11 +3356,11 @@ class API_VM_Snapshot_Rollback(Resource):
required: true required: true
description: The name of the snapshot to roll back to description: The name of the snapshot to roll back to
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3436,15 +3436,15 @@ class API_VM_Snapshot_Export(Resource):
description: The absolute file path to export the snapshot to on the active primary coordinator description: The absolute file path to export the snapshot to on the active primary coordinator
- in: query - in: query
name: incremental_parent name: incremental_parent
type: boolean type: string
required: false required: false
description: A snapshot name to generate an incremental diff from description: A snapshot name to generate an incremental diff from
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3529,11 +3529,11 @@ class API_VM_Snapshot_Import(Resource):
default: true default: true
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3572,6 +3572,220 @@ class API_VM_Snapshot_Import(Resource):
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import") api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
# /vm/<vm>/snapshot/send
class API_VM_Snapshot_Send(Resource):
@RequestParser(
[
{
"name": "snapshot_name",
"required": True,
"helptext": "A snapshot name must be specified",
},
{
"name": "destination_api_uri",
"required": True,
"helptext": "A destination API URI must be specified",
},
{
"name": "destination_api_key",
"required": True,
"helptext": "A destination API key must be specified",
},
{
"name": "destination_api_verify_ssl",
"required": False,
},
{
"name": "incremental_parent",
"required": False,
},
{
"name": "destination_storage_pool",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Send a snapshot of a VM's disks and configuration to another PVC cluster
---
tags:
- vm
parameters:
- in: query
name: snapshot_name
type: string
required: true
description: The name of the snapshot to export (must exist)
- in: query
name: destination_api_uri
type: string
required: true
description: The base API URI of the destination PVC cluster (with prefix if applicable)
- in: query
name: destination_api_key
type: string
required: true
description: The API authentication key of the destination PVC cluster
- in: query
name: destination_api_verify_ssl
type: boolean
required: false
default: true
description: Whether or not to validate SSL certificates for an SSL-enabled destination API
- in: query
name: incremental_parent
type: string
required: false
description: A snapshot name to generate an incremental diff from; incremental send only if unset
- in: query
name: destination_storage_pool
type: string
required: false
default: source storage pool name
description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool
responses:
202:
description: Accepted
schema:
type: string
description: The Celery job ID of the task
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
snapshot_name = reqargs.get("snapshot_name", None)
destination_api_uri = reqargs.get("destination_api_uri", None)
destination_api_key = reqargs.get("destination_api_key", None)
destination_api_verify_ssl = vool(strtobool(reqargs.get("destination_api_verify_ssl", "true")))
incremental_parent = reqargs.get("incremental_parent", None)
destination_storage_pool = reqargs.get("destination_storage_pool", None)
task = run_celery_task(
"vm.send_snapshot",
domain=vm,
snapshot_name=snapshot_name,
destination_api_uri=destination_api_uri,
destination_api_key=destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
run_on="primary",
)
return (
{
"task_id": task.id,
"task_name": "vm.send_snapshot",
"run_on": f"{get_primary_node()} (primary)",
},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_VM_Snapshot_Send, "/vm/<vm>/snapshot/send")
# /vm/<vm>/snapshot/receive/block
class API_VM_Snapshot_Receive_Block(Resource):
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
{
"name": "size",
"required": True,
},
{
"name": "source_snapshot",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: size
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
- in: query
name: source_snapshot
type: string
required: false
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block(
reqargs.get("pool"),
reqargs.get("volume") + "_recv",
reqargs.get("snapshot"),
int(reqargs.get("size")),
flask.request.stream,
source_snapshot=reqargs.get("source_snapshot"),
)
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
# /vm/autobackup # /vm/autobackup
class API_VM_Autobackup_Root(Resource): class API_VM_Autobackup_Root(Resource):
@RequestParser( @RequestParser(
@ -3601,14 +3815,11 @@ class API_VM_Autobackup_Root(Resource):
type: string type: string
example: "user@domain.tld" example: "user@domain.tld"
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
properties: description: The Celery job ID of the task
task_id:
type: string
description: Task ID for the provisioner Celery worker
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5058,7 +5269,95 @@ class API_Storage_Ceph_Benchmark(Resource):
description: The PVC benchmark format of the results description: The PVC benchmark format of the results
benchmark_result: benchmark_result:
type: object type: object
description: A benchmark test result; format not documented due to complexity description: A format 0 test result
properties:
test_name:
type: object
properties:
overall:
type: object
properties:
iosize:
type: string (integer)
description: The total size of the benchmark data
bandwidth:
type: string (integer)
description: The average bandwidth (KiB/s)
iops:
type: string (integer)
description: The average IOPS
runtime:
type: string (integer)
description: The total test time in milliseconds
latency:
type: object
properties:
min:
type: string (integer)
description: The minimum latency measurement
max:
type: string (integer)
description: The maximum latency measurement
mean:
type: string (float)
description: The mean latency measurement
stdev:
type: string (float)
description: The standard deviation of latency
bandwidth:
type: object
properties:
min:
type: string (integer)
description: The minimum bandwidth (KiB/s) measurement
max:
type: string (integer)
description: The maximum bandwidth (KiB/s) measurement
mean:
type: string (float)
description: The mean bandwidth (KiB/s) measurement
stdev:
type: string (float)
description: The standard deviation of bandwidth
numsamples:
type: string (integer)
description: The number of samples taken during the test
iops:
type: object
properties:
min:
type: string (integer)
description: The minimum IOPS measurement
max:
type: string (integer)
description: The maximum IOPS measurement
mean:
type: string (float)
description: The mean IOPS measurement
stdev:
type: string (float)
description: The standard deviation of IOPS
numsamples:
type: string (integer)
description: The number of samples taken during the test
cpu:
type: object
properties:
user:
type: string (float percentage)
description: The percentage of test time spent in user space
system:
type: string (float percentage)
description: The percentage of test time spent in system (kernel) space
ctxsw:
type: string (integer)
description: The number of context switches during the test
majfault:
type: string (integer)
description: The number of major page faults during the test
minfault:
type: string (integer)
description: The number of minor page faults during the test
""" """
return list_benchmarks(config, reqargs.get("job", None)) return list_benchmarks(config, reqargs.get("job", None))
@ -5069,10 +5368,6 @@ class API_Storage_Ceph_Benchmark(Resource):
"required": True, "required": True,
"helptext": "A valid pool must be specified.", "helptext": "A valid pool must be specified.",
}, },
{
"name": "name",
"required": False,
},
] ]
) )
@Authenticator @Authenticator
@ -5088,17 +5383,17 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string type: string
required: true required: true
description: The PVC storage pool to benchmark description: The PVC storage pool to benchmark
- in: query
name: name
type: string
required: false
description: An optional override name for the job
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: string type: string
description: The Celery job ID of the benchmark (unused elsewhere) description: The Celery job ID of the task
400:
description: Bad request
schema:
type: object
id: Message
""" """
# Verify that the pool is valid # Verify that the pool is valid
_list, code = api_helper.ceph_pool_list( _list, code = api_helper.ceph_pool_list(
@ -5110,10 +5405,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400 }, 400
task = run_celery_task( task = run_celery_task(
"storage.benchmark", "storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
pool=reqargs.get("pool", None),
name=reqargs.get("name", None),
run_on="primary",
) )
return ( return (
{ {
@ -5225,11 +5517,11 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
required: true required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5424,11 +5716,11 @@ class API_Storage_Ceph_OSD_Root(Resource):
required: false required: false
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5539,11 +5831,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: false required: false
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5597,11 +5889,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: true required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5655,7 +5947,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
- in: query - in: query
name: force name: force
type: boolean type: boolean
required: flase required: false
description: Force removal even if some step(s) fail description: Force removal even if some step(s) fail
- in: query - in: query
name: yes-i-really-mean-it name: yes-i-really-mean-it
@ -5663,11 +5955,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: true required: true
description: A confirmation string to ensure that the API consumer really means it description: A confirmation string to ensure that the API consumer really means it
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
404: 404:
description: Not found description: Not found
schema: schema:
@ -6255,7 +6547,7 @@ class API_Storage_Ceph_Volume_Root(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if volume creation would violate 80% full soft cap on the pool description: Force action if volume creation would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6332,7 +6624,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if volume creation would violate 80% full soft cap on the pool description: Force action if volume creation would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6384,7 +6676,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if new volume size would violate 80% full soft cap on the pool description: Force action if new volume size would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6509,7 +6801,7 @@ class API_Storage_Ceph_Volume_Element_Clone(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if clone volume size would violate 80% full soft cap on the pool description: Force action if clone volume size would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -9364,14 +9656,11 @@ class API_Provisioner_Create_Root(Resource):
type: string type: string
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
properties: description: The Celery job ID of the task
task_id:
type: string
description: Task ID for the provisioner Celery worker
400: 400:
description: Bad request description: Bad request
schema: schema:

View File

@ -1280,7 +1280,7 @@ def vm_flush_locks(zkhandler, vm):
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
) )
if retdata[0].get("state") not in ["stop", "disable"]: if retdata[0].get("state") not in ["stop", "disable", "mirror"]:
return {"message": "VM must be stopped to flush locks"}, 400 return {"message": "VM must be stopped to flush locks"}, 400
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm) retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
@ -1294,6 +1294,58 @@ def vm_flush_locks(zkhandler, vm):
return output, retcode return output, retcode
def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snapshot=None):
try:
import rados
import rbd
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
ioctx = cluster.open_ioctx(pool)
if not source_snapshot:
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, volume, size)
image = rbd.Image(ioctx, volume)
last_chunk = 0
chunk_size = 1024 * 1024 * 128
if source_snapshot:
# Receiving diff data
print(f"Applying diff between {source_snapshot} and {snapshot}")
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
# Extract the offset and length (8 bytes each) and the data
offset = int.from_bytes(chunk[:8], 'big')
length = int.from_bytes(chunk[8:16], 'big')
data = chunk[16:16 + length]
image.write(data, offset)
image.create_snap(snapshot)
else:
# Receiving full image
print(f"Importing full snapshot {snapshot}")
while True:
chunk = flask.request.stream.read(chunk_size)
if not chunk:
break
image.write(chunk, last_chunk)
last_chunk += len(chunk)
image.create_snap(snapshot)
image.close()
ioctx.close()
cluster.shutdown()
except Exception as e:
return {"message": f"Failed to import block device: {e}"}, 400
# #
# Network functions # Network functions
# #

View File

@ -3755,13 +3755,6 @@ def cli_storage_benchmark():
@click.command(name="run", short_help="Run a storage benchmark.") @click.command(name="run", short_help="Run a storage benchmark.")
@connection_req @connection_req
@click.argument("pool") @click.argument("pool")
@click.option(
"--name",
"name",
default=None,
show_default=False,
help="Use a custom name for the job",
)
@click.option( @click.option(
"--wait/--no-wait", "--wait/--no-wait",
"wait_flag", "wait_flag",
@ -3773,14 +3766,12 @@ def cli_storage_benchmark():
@confirm_opt( @confirm_opt(
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue" "Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
) )
def cli_storage_benchmark_run(pool, name, wait_flag): def cli_storage_benchmark_run(pool, wait_flag):
""" """
Run a storage benchmark on POOL in the background. Run a storage benchmark on POOL in the background.
""" """
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run( retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag)
CLI_CONFIG, pool, name, wait_flag
)
if retcode and wait_flag: if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)

View File

@ -206,12 +206,12 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}") output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
vm_states = ["start", "disable"] vm_states = ["start", "disable", "mirror"]
vm_states.extend( vm_states.extend(
[ [
state state
for state in data.get("vms", {}).keys() for state in data.get("vms", {}).keys()
if state not in ["total", "start", "disable"] if state not in ["total", "start", "disable", "mirror"]
] ]
) )
@ -221,7 +221,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
continue continue
if state in ["start"]: if state in ["start"]:
state_colour = ansii["green"] state_colour = ansii["green"]
elif state in ["migrate", "disable", "provision"]: elif state in ["migrate", "disable", "provision", "mirror"]:
state_colour = ansii["blue"] state_colour = ansii["blue"]
elif state in ["stop", "fail"]: elif state in ["stop", "fail"]:
state_colour = ansii["red"] state_colour = ansii["red"]

View File

@ -30,7 +30,6 @@ from requests_toolbelt.multipart.encoder import (
import pvc.lib.ansiprint as ansiprint import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
from pvc.cli.helpers import MAX_CONTENT_WIDTH
# #
# Supplemental functions # Supplemental functions
@ -1725,17 +1724,15 @@ def format_list_snapshot(config, snapshot_list):
# #
# Benchmark functions # Benchmark functions
# #
def ceph_benchmark_run(config, pool, name, wait_flag): def ceph_benchmark_run(config, pool, wait_flag):
""" """
Run a storage benchmark against {pool} Run a storage benchmark against {pool}
API endpoint: POST /api/v1/storage/ceph/benchmark API endpoint: POST /api/v1/storage/ceph/benchmark
API arguments: pool={pool}, name={name} API arguments: pool={pool}
API schema: {message} API schema: {message}
""" """
params = {"pool": pool} params = {"pool": pool}
if name:
params["name"] = name
response = call_api(config, "post", "/storage/ceph/benchmark", params=params) response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
return get_wait_retdata(response, wait_flag) return get_wait_retdata(response, wait_flag)
@ -1807,7 +1804,7 @@ def get_benchmark_list_results(benchmark_format, benchmark_data):
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy( benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy(
benchmark_data benchmark_data
) )
elif benchmark_format == 1 or benchmark_format == 2: elif benchmark_format == 1:
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json( benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json(
benchmark_data benchmark_data
) )
@ -2009,7 +2006,6 @@ def format_info_benchmark(config, benchmark_information):
benchmark_matrix = { benchmark_matrix = {
0: format_info_benchmark_legacy, 0: format_info_benchmark_legacy,
1: format_info_benchmark_json, 1: format_info_benchmark_json,
2: format_info_benchmark_json,
} }
benchmark_version = benchmark_information[0]["test_format"] benchmark_version = benchmark_information[0]["test_format"]
@ -2344,15 +2340,12 @@ def format_info_benchmark_json(config, benchmark_information):
if benchmark_information["benchmark_result"] == "Running": if benchmark_information["benchmark_result"] == "Running":
return "Benchmark test is still running." return "Benchmark test is still running."
benchmark_format = benchmark_information["test_format"]
benchmark_details = benchmark_information["benchmark_result"] benchmark_details = benchmark_information["benchmark_result"]
# Format a nice output; do this line-by-line then concat the elements at the end # Format a nice output; do this line-by-line then concat the elements at the end
ainformation = [] ainformation = []
ainformation.append( ainformation.append(
"{}Storage Benchmark details (format {}):{}".format( "{}Storage Benchmark details:{}".format(ansiprint.bold(), ansiprint.end())
ansiprint.bold(), benchmark_format, ansiprint.end()
)
) )
nice_test_name_map = { nice_test_name_map = {
@ -2400,7 +2393,7 @@ def format_info_benchmark_json(config, benchmark_information):
if element[1] != 0: if element[1] != 0:
useful_latency_tree.append(element) useful_latency_tree.append(element)
max_rows = 5 max_rows = 9
if len(useful_latency_tree) > 9: if len(useful_latency_tree) > 9:
max_rows = len(useful_latency_tree) max_rows = len(useful_latency_tree)
elif len(useful_latency_tree) < 9: elif len(useful_latency_tree) < 9:
@ -2409,10 +2402,15 @@ def format_info_benchmark_json(config, benchmark_information):
# Format the static data # Format the static data
overall_label = [ overall_label = [
"BW/s:", "Overall BW/s:",
"IOPS:", "Overall IOPS:",
"I/O:", "Total I/O:",
"Time:", "Runtime (s):",
"User CPU %:",
"System CPU %:",
"Ctx Switches:",
"Major Faults:",
"Minor Faults:",
] ]
while len(overall_label) < max_rows: while len(overall_label) < max_rows:
overall_label.append("") overall_label.append("")
@ -2421,149 +2419,68 @@ def format_info_benchmark_json(config, benchmark_information):
format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])), format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])),
format_ops_tohuman(int(job_details[io_class]["iops"])), format_ops_tohuman(int(job_details[io_class]["iops"])),
format_bytes_tohuman(int(job_details[io_class]["io_bytes"])), format_bytes_tohuman(int(job_details[io_class]["io_bytes"])),
str(job_details["job_runtime"] / 1000) + "s", job_details["job_runtime"] / 1000,
job_details["usr_cpu"],
job_details["sys_cpu"],
job_details["ctx"],
job_details["majf"],
job_details["minf"],
] ]
while len(overall_data) < max_rows: while len(overall_data) < max_rows:
overall_data.append("") overall_data.append("")
cpu_label = [
"Total:",
"User:",
"Sys:",
"OSD:",
"MON:",
]
while len(cpu_label) < max_rows:
cpu_label.append("")
cpu_data = [
(
benchmark_details[test]["avg_cpu_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
round(job_details["usr_cpu"], 2),
round(job_details["sys_cpu"], 2),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(cpu_data) < max_rows:
cpu_data.append("")
memory_label = [
"Total:",
"OSD:",
"MON:",
]
while len(memory_label) < max_rows:
memory_label.append("")
memory_data = [
(
benchmark_details[test]["avg_memory_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(memory_data) < max_rows:
memory_data.append("")
network_label = [
"Total:",
"Sent:",
"Recv:",
]
while len(network_label) < max_rows:
network_label.append("")
network_data = [
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["total"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["sent"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["recv"])
)
if benchmark_format > 1
else "N/A"
),
]
while len(network_data) < max_rows:
network_data.append("")
bandwidth_label = [ bandwidth_label = [
"Min:", "Min:",
"Max:", "Max:",
"Mean:", "Mean:",
"StdDev:", "StdDev:",
"Samples:", "Samples:",
"",
"",
"",
"",
] ]
while len(bandwidth_label) < max_rows: while len(bandwidth_label) < max_rows:
bandwidth_label.append("") bandwidth_label.append("")
bandwidth_data = [ bandwidth_data = [
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024) format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024),
+ " / " format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024),
+ format_ops_tohuman(int(job_details[io_class]["iops_min"])), format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024),
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024) format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024),
+ " / " job_details[io_class]["bw_samples"],
+ format_ops_tohuman(int(job_details[io_class]["iops_max"])), "",
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024) "",
+ " / " "",
+ format_ops_tohuman(int(job_details[io_class]["iops_mean"])), "",
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
str(job_details[io_class]["bw_samples"])
+ " / "
+ str(job_details[io_class]["iops_samples"]),
] ]
while len(bandwidth_data) < max_rows: while len(bandwidth_data) < max_rows:
bandwidth_data.append("") bandwidth_data.append("")
lat_label = [ iops_data = [
"Min:", format_ops_tohuman(int(job_details[io_class]["iops_min"])),
"Max:", format_ops_tohuman(int(job_details[io_class]["iops_max"])),
"Mean:", format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
"StdDev:", format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
job_details[io_class]["iops_samples"],
"",
"",
"",
"",
] ]
while len(lat_label) < max_rows: while len(iops_data) < max_rows:
lat_label.append("") iops_data.append("")
lat_data = [ lat_data = [
int(job_details[io_class]["lat_ns"]["min"]) / 1000, int(job_details[io_class]["lat_ns"]["min"]) / 1000,
int(job_details[io_class]["lat_ns"]["max"]) / 1000, int(job_details[io_class]["lat_ns"]["max"]) / 1000,
int(job_details[io_class]["lat_ns"]["mean"]) / 1000, int(job_details[io_class]["lat_ns"]["mean"]) / 1000,
int(job_details[io_class]["lat_ns"]["stddev"]) / 1000, int(job_details[io_class]["lat_ns"]["stddev"]) / 1000,
"",
"",
"",
"",
"",
] ]
while len(lat_data) < max_rows: while len(lat_data) < max_rows:
lat_data.append("") lat_data.append("")
@ -2572,119 +2489,98 @@ def format_info_benchmark_json(config, benchmark_information):
lat_bucket_label = list() lat_bucket_label = list()
lat_bucket_data = list() lat_bucket_data = list()
for element in useful_latency_tree: for element in useful_latency_tree:
lat_bucket_label.append(element[0] + ":" if element[0] else "") lat_bucket_label.append(element[0])
lat_bucket_data.append(round(float(element[1]), 2) if element[1] else "") lat_bucket_data.append(element[1])
while len(lat_bucket_label) < max_rows:
lat_bucket_label.append("")
while len(lat_bucket_data) < max_rows:
lat_bucket_label.append("")
# Column default widths # Column default widths
overall_label_length = 5 overall_label_length = 0
overall_column_length = 0 overall_column_length = 0
cpu_label_length = 6 bandwidth_label_length = 0
cpu_column_length = 0 bandwidth_column_length = 11
memory_label_length = 6 iops_column_length = 4
memory_column_length = 0 latency_column_length = 12
network_label_length = 6
network_column_length = 6
bandwidth_label_length = 8
bandwidth_column_length = 0
latency_label_length = 7
latency_column_length = 0
latency_bucket_label_length = 0 latency_bucket_label_length = 0
latency_bucket_column_length = 0
# Column layout: # Column layout:
# Overall CPU Memory Network Bandwidth/IOPS Latency Percentiles # General Bandwidth IOPS Latency Percentiles
# --------- ----- ------- -------- -------------- -------- --------------- # --------- ---------- -------- -------- ---------------
# BW Total Total Total Min Min A # Size Min Min Min A
# IOPS Usr OSD Send Max Max B # BW Max Max Max B
# Time Sys MON Recv Mean Mean ... # IOPS Mean Mean Mean ...
# Size OSD StdDev StdDev Z # Runtime StdDev StdDev StdDev Z
# MON Samples # UsrCPU Samples Samples
# SysCPU
# CtxSw
# MajFault
# MinFault
# Set column widths # Set column widths
for item in overall_label:
_item_length = len(str(item))
if _item_length > overall_label_length:
overall_label_length = _item_length
for item in overall_data: for item in overall_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > overall_column_length: if _item_length > overall_column_length:
overall_column_length = _item_length overall_column_length = _item_length
for item in cpu_data: test_name_length = len(nice_test_name_map[test])
_item_length = len(str(item)) if test_name_length > overall_label_length + overall_column_length:
if _item_length > cpu_column_length: _diff = test_name_length - (overall_label_length + overall_column_length)
cpu_column_length = _item_length overall_column_length += _diff
for item in memory_data: for item in bandwidth_label:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > memory_column_length: if _item_length > bandwidth_label_length:
memory_column_length = _item_length bandwidth_label_length = _item_length
for item in network_data:
_item_length = len(str(item))
if _item_length > network_column_length:
network_column_length = _item_length
for item in bandwidth_data: for item in bandwidth_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > bandwidth_column_length: if _item_length > bandwidth_column_length:
bandwidth_column_length = _item_length bandwidth_column_length = _item_length
for item in iops_data:
_item_length = len(str(item))
if _item_length > iops_column_length:
iops_column_length = _item_length
for item in lat_data: for item in lat_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > latency_column_length: if _item_length > latency_column_length:
latency_column_length = _item_length latency_column_length = _item_length
for item in lat_bucket_data: for item in lat_bucket_label:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > latency_bucket_column_length: if _item_length > latency_bucket_label_length:
latency_bucket_column_length = _item_length latency_bucket_label_length = _item_length
# Top row (Headers) # Top row (Headers)
ainformation.append( ainformation.append(
"{bold}{overall_label: <{overall_label_length}} {header_fill}{end_bold}".format( "{bold}\
{overall_label: <{overall_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{iops: <{iops_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket} \
{end_bold}".format(
bold=ansiprint.bold(), bold=ansiprint.bold(),
end_bold=ansiprint.end(), end_bold=ansiprint.end(),
overall_label=nice_test_name_map[test], overall_label=nice_test_name_map[test],
overall_label_length=overall_label_length, overall_label_length=overall_label_length,
header_fill="-" bandwidth_label="",
* ( bandwidth_label_length=bandwidth_label_length,
(MAX_CONTENT_WIDTH if MAX_CONTENT_WIDTH <= 120 else 120) bandwidth="Bandwidth/s",
- len(nice_test_name_map[test]) bandwidth_length=bandwidth_column_length,
- 4 iops="IOPS",
), iops_length=iops_column_length,
) latency="Latency (μs)",
) latency_length=latency_column_length,
latency_bucket_label="Latency Buckets (μs/%)",
ainformation.append( latency_bucket_label_length=latency_bucket_label_length,
"{bold}\ latency_bucket="",
{overall_label: <{overall_label_length}} \
{cpu_label: <{cpu_label_length}} \
{memory_label: <{memory_label_length}} \
{network_label: <{network_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{latency_label: <{latency_label_length}} \
{latency_bucket_label: <{latency_bucket_label_length}}\
{end_bold}".format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
overall_label="Overall",
overall_label_length=overall_label_length + overall_column_length + 1,
cpu_label="CPU (%)",
cpu_label_length=cpu_label_length + cpu_column_length + 1,
memory_label="Memory (%)",
memory_label_length=memory_label_length + memory_column_length + 1,
network_label="Network (bps)",
network_label_length=network_label_length + network_column_length + 1,
bandwidth_label="Bandwidth / IOPS",
bandwidth_label_length=bandwidth_label_length
+ bandwidth_column_length
+ 1,
latency_label="Latency (μs)",
latency_label_length=latency_label_length + latency_column_length + 1,
latency_bucket_label="Buckets (μs/%)",
latency_bucket_label_length=latency_bucket_label_length
+ latency_bucket_column_length,
) )
) )
@ -2692,20 +2588,14 @@ def format_info_benchmark_json(config, benchmark_information):
# Top row (Headers) # Top row (Headers)
ainformation.append( ainformation.append(
"{bold}\ "{bold}\
{overall_label: <{overall_label_length}} \ {overall_label: >{overall_label_length}} \
{overall: <{overall_length}} \ {overall: <{overall_length}} \
{cpu_label: <{cpu_label_length}} \ {bandwidth_label: >{bandwidth_label_length}} \
{cpu: <{cpu_length}} \ {bandwidth: <{bandwidth_length}} \
{memory_label: <{memory_label_length}} \ {iops: <{iops_length}} \
{memory: <{memory_length}} \ {latency: <{latency_length}} \
{network_label: <{network_label_length}} \ {latency_bucket_label: >{latency_bucket_label_length}} \
{network: <{network_length}} \ {latency_bucket} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{latency_label: <{latency_label_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket}\
{end_bold}".format( {end_bold}".format(
bold="", bold="",
end_bold="", end_bold="",
@ -2713,24 +2603,12 @@ def format_info_benchmark_json(config, benchmark_information):
overall_label_length=overall_label_length, overall_label_length=overall_label_length,
overall=overall_data[idx], overall=overall_data[idx],
overall_length=overall_column_length, overall_length=overall_column_length,
cpu_label=cpu_label[idx],
cpu_label_length=cpu_label_length,
cpu=cpu_data[idx],
cpu_length=cpu_column_length,
memory_label=memory_label[idx],
memory_label_length=memory_label_length,
memory=memory_data[idx],
memory_length=memory_column_length,
network_label=network_label[idx],
network_label_length=network_label_length,
network=network_data[idx],
network_length=network_column_length,
bandwidth_label=bandwidth_label[idx], bandwidth_label=bandwidth_label[idx],
bandwidth_label_length=bandwidth_label_length, bandwidth_label_length=bandwidth_label_length,
bandwidth=bandwidth_data[idx], bandwidth=bandwidth_data[idx],
bandwidth_length=bandwidth_column_length, bandwidth_length=bandwidth_column_length,
latency_label=lat_label[idx], iops=iops_data[idx],
latency_label_length=latency_label_length, iops_length=iops_column_length,
latency=lat_data[idx], latency=lat_data[idx],
latency_length=latency_column_length, latency_length=latency_column_length,
latency_bucket_label=lat_bucket_label[idx], latency_bucket_label=lat_bucket_label[idx],
@ -2739,4 +2617,4 @@ def format_info_benchmark_json(config, benchmark_information):
) )
) )
return "\n".join(ainformation) + "\n" return "\n".join(ainformation)

View File

@ -1760,6 +1760,7 @@ def format_info(config, domain_information, long_output):
"provision": ansiprint.blue(), "provision": ansiprint.blue(),
"restore": ansiprint.blue(), "restore": ansiprint.blue(),
"import": ansiprint.blue(), "import": ansiprint.blue(),
"mirror": ansiprint.blue(),
} }
ainformation.append( ainformation.append(
"{}State:{} {}{}{}".format( "{}State:{} {}{}{}".format(

View File

@ -19,34 +19,31 @@
# #
############################################################################### ###############################################################################
import os
import psutil
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
import subprocess
from datetime import datetime from datetime import datetime
from json import loads, dumps from json import loads, dumps
from time import sleep
from daemon_lib.celery import start, fail, log_info, update, finish from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as pvc_ceph
# Define the current test format # Define the current test format
TEST_FORMAT = 2 TEST_FORMAT = 1
# We run a total of 8 tests, to give a generalized idea of performance on the cluster: # We run a total of 8 tests, to give a generalized idea of performance on the cluster:
# 1. A sequential read test of 64GB with a 4M block size # 1. A sequential read test of 8GB with a 4M block size
# 2. A sequential write test of 64GB with a 4M block size # 2. A sequential write test of 8GB with a 4M block size
# 3. A random read test of 64GB with a 4M block size # 3. A random read test of 8GB with a 4M block size
# 4. A random write test of 64GB with a 4M block size # 4. A random write test of 8GB with a 4M block size
# 5. A random read test of 64GB with a 256k block size # 5. A random read test of 8GB with a 256k block size
# 6. A random write test of 64GB with a 256k block size # 6. A random write test of 8GB with a 256k block size
# 7. A random read test of 64GB with a 4k block size # 7. A random read test of 8GB with a 4k block size
# 8. A random write test of 64GB with a 4k block size # 8. A random write test of 8GB with a 4k block size
# Taken together, these 8 results should give a very good indication of the overall storage performance # Taken together, these 8 results should give a very good indication of the overall storage performance
# for a variety of workloads. # for a variety of workloads.
test_matrix = { test_matrix = {
@ -103,7 +100,7 @@ test_matrix = {
# Specify the benchmark volume name and size # Specify the benchmark volume name and size
benchmark_volume_name = "pvcbenchmark" benchmark_volume_name = "pvcbenchmark"
benchmark_volume_size = "64G" benchmark_volume_size = "8G"
# #
@ -229,7 +226,7 @@ def cleanup_benchmark_volume(
def run_benchmark_job( def run_benchmark_job(
config, test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
): ):
test_spec = test_matrix[test] test_spec = test_matrix[test]
log_info(None, f"Running test '{test}'") log_info(None, f"Running test '{test}'")
@ -259,165 +256,31 @@ def run_benchmark_job(
) )
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split()))) log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
# Run the fio command manually instead of using our run_os_command wrapper
# This will help us gather statistics about this node while it's running
process = subprocess.Popen(
fio_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait 15 seconds for the test to start
log_info(None, "Waiting 15 seconds for test resource stabilization")
sleep(15)
# Set up function to get process CPU utilization by name
def get_cpu_utilization_by_name(process_name):
cpu_usage = 0
for proc in psutil.process_iter(["name", "cpu_percent"]):
if proc.info["name"] == process_name:
cpu_usage += proc.info["cpu_percent"]
return cpu_usage
# Set up function to get process memory utilization by name
def get_memory_utilization_by_name(process_name):
memory_usage = 0
for proc in psutil.process_iter(["name", "memory_percent"]):
if proc.info["name"] == process_name:
memory_usage += proc.info["memory_percent"]
return memory_usage
# Set up function to get network traffic utilization in bps
def get_network_traffic_bps(interface, duration=1):
# Get initial network counters
net_io_start = psutil.net_io_counters(pernic=True)
if interface not in net_io_start:
return None, None
stats_start = net_io_start[interface]
bytes_sent_start = stats_start.bytes_sent
bytes_recv_start = stats_start.bytes_recv
# Wait for the specified duration
sleep(duration)
# Get final network counters
net_io_end = psutil.net_io_counters(pernic=True)
stats_end = net_io_end[interface]
bytes_sent_end = stats_end.bytes_sent
bytes_recv_end = stats_end.bytes_recv
# Calculate bytes per second
bytes_sent_per_sec = (bytes_sent_end - bytes_sent_start) / duration
bytes_recv_per_sec = (bytes_recv_end - bytes_recv_start) / duration
# Convert to bits per second (bps)
bits_sent_per_sec = bytes_sent_per_sec * 8
bits_recv_per_sec = bytes_recv_per_sec * 8
bits_total_per_sec = bits_sent_per_sec + bits_recv_per_sec
return bits_sent_per_sec, bits_recv_per_sec, bits_total_per_sec
log_info(None, f"Starting system resource polling for test '{test}'")
storage_interface = config["storage_dev"]
total_cpus = psutil.cpu_count(logical=True)
ticks = 1
osd_cpu_utilization = 0
osd_memory_utilization = 0
mon_cpu_utilization = 0
mon_memory_utilization = 0
total_cpu_utilization = 0
total_memory_utilization = 0
storage_sent_bps = 0
storage_recv_bps = 0
storage_total_bps = 0
while process.poll() is None:
# Do collection of statistics like network bandwidth and cpu utilization
current_osd_cpu_utilization = get_cpu_utilization_by_name("ceph-osd")
current_osd_memory_utilization = get_memory_utilization_by_name("ceph-osd")
current_mon_cpu_utilization = get_cpu_utilization_by_name("ceph-mon")
current_mon_memory_utilization = get_memory_utilization_by_name("ceph-mon")
current_total_cpu_utilization = psutil.cpu_percent(interval=1)
current_total_memory_utilization = psutil.virtual_memory().percent
(
current_storage_sent_bps,
current_storage_recv_bps,
current_storage_total_bps,
) = get_network_traffic_bps(storage_interface)
# Recheck if the process is done yet; if it's not, we add the values and increase the ticks
# This helps ensure that if the process finishes earlier than the longer polls above,
# this particular tick isn't counted which can skew the average
if process.poll() is None:
osd_cpu_utilization += current_osd_cpu_utilization
osd_memory_utilization += current_osd_memory_utilization
mon_cpu_utilization += current_mon_cpu_utilization
mon_memory_utilization += current_mon_memory_utilization
total_cpu_utilization += current_total_cpu_utilization
total_memory_utilization += current_total_memory_utilization
storage_sent_bps += current_storage_sent_bps
storage_recv_bps += current_storage_recv_bps
storage_total_bps += current_storage_total_bps
ticks += 1
# Get the 1-minute load average and CPU utilization, which covers the test duration
load1, _, _ = os.getloadavg()
load1 = round(load1, 2)
# Calculate the average CPU utilization values over the runtime
# Divide the OSD and MON CPU utilization by the total number of CPU cores, because
# the total is divided this way
avg_osd_cpu_utilization = round(osd_cpu_utilization / ticks / total_cpus, 2)
avg_osd_memory_utilization = round(osd_memory_utilization / ticks, 2)
avg_mon_cpu_utilization = round(mon_cpu_utilization / ticks / total_cpus, 2)
avg_mon_memory_utilization = round(mon_memory_utilization / ticks, 2)
avg_total_cpu_utilization = round(total_cpu_utilization / ticks, 2)
avg_total_memory_utilization = round(total_memory_utilization / ticks, 2)
avg_storage_sent_bps = round(storage_sent_bps / ticks, 2)
avg_storage_recv_bps = round(storage_recv_bps / ticks, 2)
avg_storage_total_bps = round(storage_total_bps / ticks, 2)
stdout, stderr = process.communicate()
retcode = process.returncode
resource_data = {
"avg_cpu_util_percent": {
"total": avg_total_cpu_utilization,
"ceph-mon": avg_mon_cpu_utilization,
"ceph-osd": avg_osd_cpu_utilization,
},
"avg_memory_util_percent": {
"total": avg_total_memory_utilization,
"ceph-mon": avg_mon_memory_utilization,
"ceph-osd": avg_osd_memory_utilization,
},
"avg_network_util_bps": {
"sent": avg_storage_sent_bps,
"recv": avg_storage_recv_bps,
"total": avg_storage_total_bps,
},
}
try: try:
jstdout = loads(stdout) jstdout = loads(stdout)
if retcode: if retcode:
raise raise
except Exception: except Exception:
return None, None cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}': {stderr}",
)
return resource_data, jstdout return jstdout
def worker_run_benchmark(zkhandler, celery, config, pool, name): def worker_run_benchmark(zkhandler, celery, config, pool):
# Phase 0 - connect to databases # Phase 0 - connect to databases
if not name: cur_time = datetime.now().isoformat(timespec="seconds")
cur_time = datetime.now().isoformat(timespec="seconds") cur_primary = zkhandler.read("base.config.primary_node")
cur_primary = zkhandler.read("base.config.primary_node") job_name = f"{cur_time}_{cur_primary}"
job_name = f"{cur_time}_{cur_primary}"
else:
job_name = name
current_stage = 0 current_stage = 0
total_stages = 13 total_stages = 13
@ -495,8 +358,7 @@ def worker_run_benchmark(zkhandler, celery, config, pool, name):
total=total_stages, total=total_stages,
) )
resource_data, fio_data = run_benchmark_job( results[test] = run_benchmark_job(
config,
test, test,
pool, pool,
job_name=job_name, job_name=job_name,
@ -504,25 +366,6 @@ def worker_run_benchmark(zkhandler, celery, config, pool, name):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
if resource_data is None or fio_data is None:
cleanup_benchmark_volume(
pool,
job_name=job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}'",
)
results[test] = {**resource_data, **fio_data}
# Phase 3 - cleanup # Phase 3 - cleanup
current_stage += 1 current_stage += 1

View File

@ -1160,6 +1160,7 @@ def get_resource_metrics(zkhandler):
"fail": 8, "fail": 8,
"import": 9, "import": 9,
"restore": 10, "restore": 10,
"mirror": 99,
} }
state = vm["state"] state = vm["state"]
output_lines.append( output_lines.append(

View File

@ -85,6 +85,7 @@ vm_state_combinations = [
"provision", "provision",
"import", "import",
"restore", "restore",
"mirror",
] ]
ceph_osd_state_combinations = [ ceph_osd_state_combinations = [
"up,in", "up,in",

View File

@ -24,6 +24,7 @@ import re
import os.path import os.path
import lxml.objectify import lxml.objectify
import lxml.etree import lxml.etree
import subprocess
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
@ -580,7 +581,7 @@ def rename_vm(zkhandler, domain, new_domain):
# Verify that the VM is in a stopped state; renaming is not supported otherwise # Verify that the VM is in a stopped state; renaming is not supported otherwise
state = zkhandler.read(("domain.state", dom_uuid)) state = zkhandler.read(("domain.state", dom_uuid))
if state not in ["stop", "disable"]: if state not in ["stop", "disable", "mirror"]:
return ( return (
False, False,
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format( 'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
@ -1125,6 +1126,7 @@ def get_list(
"migrate", "migrate",
"unmigrate", "unmigrate",
"provision", "provision",
"mirror",
] ]
if state not in valid_states: if state not in valid_states:
return False, 'VM state "{}" is not valid.'.format(state) return False, 'VM state "{}" is not valid.'.format(state)
@ -1903,10 +1905,10 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
# Check that the domain is stopped (unless force_unlock is set) # Check that the domain is stopped (unless force_unlock is set)
domain_state = zkhandler.read(("domain.state", dom_uuid)) domain_state = zkhandler.read(("domain.state", dom_uuid))
if not force_unlock and domain_state not in ["stop", "disable", "fail"]: if not force_unlock and domain_state not in ["stop", "disable", "fail", "mirror"]:
fail( fail(
celery, celery,
f"VM state {domain_state} not in [stop, disable, fail] and not forcing", f"VM state {domain_state} not in [stop, disable, fail, mirror] and not forcing",
) )
return False return False
@ -2329,7 +2331,7 @@ def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name):
# Verify that the VM is in a stopped state; renaming is not supported otherwise # Verify that the VM is in a stopped state; renaming is not supported otherwise
state = zkhandler.read(("domain.state", dom_uuid)) state = zkhandler.read(("domain.state", dom_uuid))
if state not in ["stop", "disable"]: if state not in ["stop", "disable", "mirror"]:
fail( fail(
celery, celery,
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running", f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
@ -3118,3 +3120,389 @@ def vm_worker_import_snapshot(
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
) )
def vm_worker_send_snapshot(
zkhandler,
celery,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
):
import requests
from packaging.version import parse as parse_version
current_stage = 0
total_stages = 1
start(
celery,
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
current=current_stage,
total=total_stages,
)
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
# Get our side's VM configuration details
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
if not isinstance(vm_detail, dict):
fail(celery, f"VM listing returned invalid data: {vm_detail}",)
return False
# Check if the snapshot exists
if not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Check if the incremental parent exists
if incremental_parent is not None and not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Validate that the destination cluster can be reached
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/octet-stream",
}
try:
# Hit the API root; this should return "PVC API version x"
response = requests.get(
f"{destination_api_uri}/",
timeout=destination_api_timeout,
headers=None,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
if "PVC API" not in response.json().get("message"):
raise ValueError("Remote API is not a PVC API or incorrect URI given")
except requests.exceptions.ConnectionError as e:
fail(
celery,
f"Connection to remote API timed out: {e}",
)
return False
except ValueError as e:
fail(
celery,
f"Connection to remote API is not valid: {e}",
)
return False
except Exception as e:
fail(
celery,
f"Connection to remote API failed: {e}",
)
return False
# Hit the API "/status" endpoint to validate API key and cluster status
response = requests.get(
f"{destination_api_uri}/status",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
destination_cluster_status = response.json()
current_destination_pvc_version = destination_cluster_status.get(
"pvc_version", None
)
if current_destination_pvc_version is None:
fail(
celery,
"Connection to remote API failed: no PVC version information returned",
)
return False
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
if parse_version(current_destination_pvc_version) < parse_version(
expected_destination_pvc_version
):
fail(
celery,
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
)
return False
# Check if the VM already exists on the remote
response = requests.get(
f"{destination_api_uri}/vm/{domain}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
destination_vm_status = response.json()
current_destination_vm_state = destination_vm_status.get(
"state", None
)
if current_destination_vm_state is not None and current_destination_vm_state != "mirror":
fail(
celery,
f"Remote PVC VM exists and is not a mirror",
)
return False
# Get details about VM snapshot
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
[
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.name",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.timestamp",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.xml",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.rbd_snapshots",
snapshot_name,
)
),
]
)
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
# Get details about remote VM snapshots
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
latest_destination_vm_snapshot = destination_vm_snapshots[0]
# Check if this snapshot is in the remote list already
if snapshot_name in [s['name'] for s in destination_vm_snapshots]:
fail(
celery,
f"Snapshot {snapshot_name} already exists on the target",
)
return False
# Check if this snapshot is older than the latest remote VM snapshot
if snapshot_timestamp < latest_destination_vm_snapshot["timestamp"]:
fail(
celery,
f"Target has a newer snapshot ({latest_destination_vm_snapshot['name']}); cannot send old snapshot {snapshot_name}",
)
return False
# Check that our incremental parent exists on the remote VM
if incremental_parent is not None:
if increment_parent not in [s['name'] for s in destination_vm_snapshots]:
fail(
celery,
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
)
return False
# Set send type
send_type = "incremental" if incremental_parent is not None else "full"
# Begin send, set stages
total_stages = 2 + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
# Create the block devices on the remote side if this is a new VM send
for rbd in [r for r in vm_detail["disks"] if r['type'] == 'rbd']:
pool, volume = rbd["name"].split('/')
current_stage += 1
update(
celery,
f"Preparing remote volume for {rbd}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Get the storage volume details
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
if not retcode or len(retdata) != 1:
if len(retdata) < 1:
error_message = f"No detail returned for volume {rbd}"
elif len(retdata) > 1:
error_message = f"Multiple details returned for volume {rbd}"
else:
error_message = f"Error getting details for volume {rbd}"
fail(
celery,
error_message,
)
return False
try:
size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
except Exception as e:
error_message = f"Failed to get volume size for {rbd}: {e}"
if destination_storage_pool is not None:
pool = destination_storage_pool
if current_destination_vm_state is None:
current_stage += 1
update(
celery,
f"Creating remote volume {pool}/{volume} for {rbd}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Check if the volume exists on the target
response = requests.get(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
if response.status_code != 404:
fail(
celery,
f"Remote storage pool {pool} already contains volume {volume}",
)
return False
# Create the volume on the target
params = {
"size": size_bytes,
}
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=params,
data=None,
verify=destination_api_verify_ssl,
)
destination_volume_create_status = response.json()
if response.status_code != 200:
fail(
celery,
f"Failed to create volume {pool}/{volume} on target: {destination_volume_create_status['message']}",
)
return False
# Send the volume to the remote
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, name=volume, snapshot=snapshot, read_only=True)
size = image.size()
chunk_size_mb = 128
if incremental_parent is not None:
# Diff between incremental_parent and snapshot
def diff_chunker():
def diff_cb(offset, length, exists):
""" Callback to handle diff regions """
if exists:
data = image.read(offset, length)
yield (offset.to_bytes(8, 'big') + length.to_bytes(8, 'big') + data)
image.set_snap(incremental_parent)
image.diff_iterate(0, size, source_snapshot, diff_cb)
data_stream = diff_chunker()
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd}")
else:
# Full image transfer
def chunker():
d_start = time.time()
chunk_size = 1024 * 1024 * chunk_size_mb
current_chunk = 0
while current_chunk < size:
t_end = time.time()
t_tot = t_end - t_start
chunk = image.read(current_chunk, chunk_size)
yield chunk
current_chunk += chunk_size
data_stream = chunker()
celery_message = f"Sending full image of {rbd}@{snapshot_name}"
current_stage += 1
update(
celery,
f"Sending volume {rbd}@{snapshot_name} to target ({send_type})",
current=current_stage,
total=total_stages,
)
send_params = {
'pool': pool,
'volume': volume,
'snapshot': snapshot_name,
'size': size,
'source_snapshot': incremental_parent,
}
send_headers = {
'X-Api-Key': destination_api_key,,
'Content-Type': 'application/octet-stream',
'Transfer-Encoding': None # Disable chunked transfer encoding
}
try:
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
data=data_stream,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send snapshot: {e}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
# Send the VM configuration
if current_destination_vm_state is None:
# This is a new VM, so define it
#response = requests.post()
else:
# This is a modification
#response = requests.post()

View File

@ -33,6 +33,7 @@ from daemon_lib.vm import (
vm_worker_rollback_snapshot, vm_worker_rollback_snapshot,
vm_worker_export_snapshot, vm_worker_export_snapshot,
vm_worker_import_snapshot, vm_worker_import_snapshot,
vm_worker_send_snapshot,
) )
from daemon_lib.ceph import ( from daemon_lib.ceph import (
osd_worker_add_osd, osd_worker_add_osd,
@ -96,12 +97,12 @@ def create_vm(
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on") @celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, name=None, run_on="primary"): def storage_benchmark(self, pool=None, run_on="primary"):
@ZKConnection(config) @ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool, name): def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool, name) return worker_run_benchmark(zkhandler, self, config, pool)
return run_storage_benchmark(self, pool, name) return run_storage_benchmark(self, pool)
@celery.task(name="cluster.autobackup", bind=True, routing_key="run_on") @celery.task(name="cluster.autobackup", bind=True, routing_key="run_on")
@ -227,6 +228,39 @@ def vm_import_snapshot(
) )
@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on")
def vm_send_snapshot(
self,
domain=None,
snapshot_name=None,
destination_api_uri="",
destination_api_key="",
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
run_on="primary",
):
@ZKConnection(config)
def run_vm_send_snapshot(
zkhandler, self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=True, incremental_parent=None, destination_storage_pool=None,
):
return vm_worker_send_snapshot(
zkhandler,
self,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
)
return run_vm_send_snapshot(
self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, incremental_parent=incremental_parent, destination_storage_pool=destination_storage_pool
)
@celery.task(name="osd.add", bind=True, routing_key="run_on") @celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add( def osd_add(
self, self,