Compare commits

...

7 Commits

Author SHA1 Message Date
Joshua Boniface 41cd34ba4d Allow specifying job names for benchmarks 2024-09-18 14:55:12 -04:00
Joshua Boniface 736762901c Update benchmarks to include resource utilization
Adds additional polled information on node cpu, memory, and network
bandwidth for the node running the test. This should provide additional
useful information about the results of the test.

Also bumps the test format to 2 to ensure clients can handle the changes
properly.
2024-09-18 14:32:03 -04:00
Joshua Boniface ecb812ccac Update linting for pvcapid recent changes 2024-09-18 10:18:50 -04:00
Joshua Boniface a2e5df9f6d Add support for Gunicorn execution
Modifies pvcapid to run under Gunicorn when in non-debug mode, instead
of the Flask development server. This is proper practice for one, and
also helps increase performance slightly in some workloads (file uploads
mainly).
2024-09-09 13:20:03 -04:00
Joshua Boniface 73c0834f85 Remove headers and add util to short output 2024-09-06 11:40:39 -04:00
Joshua Boniface 2de999c700 Add total cluster utilization stats
Useful for evaluating the cluster resources as a whole.
2024-09-05 16:05:33 -04:00
Joshua Boniface 7543eb839d Add dedicated volume scan endpoint
Allows an imported volume to be scanned for stats independently.

Designed to be used as part of a snapshot import via API, to allow the
"create" to happen before the real import (to check for available space,
etc.) and then run this import after when the RBD volume actually
exists.
2024-09-03 20:32:27 -04:00
12 changed files with 891 additions and 340 deletions

View File

@ -19,6 +19,13 @@
#
###############################################################################
import pvcapid.Daemon # noqa: F401
import sys
from os import path
# Ensure current directory (/usr/share/pvc) is in the system path for Gunicorn
current_dir = path.dirname(path.abspath(__file__))
sys.path.append(current_dir)
import pvcapid.Daemon # noqa: F401, E402
pvcapid.Daemon.entrypoint()

View File

@ -19,15 +19,13 @@
#
###############################################################################
import subprocess
from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg
# Daemon version
version = "0.9.100"
version = "0.9.100~git-73c0834f"
# API version
API_VERSION = 1.0
@ -53,7 +51,6 @@ def strtobool(stringv):
# Configuration Parsing
##########################################################
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvcapid"
@ -61,22 +58,16 @@ config["daemon_version"] = version
##########################################################
# Entrypoint
# Flask App Creation for Gunicorn
##########################################################
def entrypoint():
import pvcapid.flaskapi as pvc_api # noqa: E402
if config["api_ssl_enabled"]:
context = SSLContext()
context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs()
context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
context = None
def create_app():
"""
Create and return the Flask app and SSL context if necessary.
"""
# Import the Flask app from pvcapid.flaskapi after adjusting the path
import pvcapid.flaskapi as pvc_api
# Print our startup messages
print("")
@ -102,9 +93,69 @@ def entrypoint():
print("")
pvc_api.celery_startup()
pvc_api.app.run(
config["api_listen_address"],
config["api_listen_port"],
threaded=True,
ssl_context=context,
)
return pvc_api.app
##########################################################
# Entrypoint
##########################################################
def entrypoint():
if config["debug"]:
app = create_app()
if config["api_ssl_enabled"]:
ssl_context = SSLContext()
ssl_context.minimum_version = TLSVersion.TLSv1
ssl_context.get_ca_certs()
ssl_context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
ssl_context = None
app.run(
config["api_listen_address"],
config["api_listen_port"],
threaded=True,
ssl_context=ssl_context,
)
else:
# Build the command to run Gunicorn
gunicorn_cmd = [
"gunicorn",
"--workers",
"1",
"--threads",
"8",
"--timeout",
"86400",
"--bind",
"{}:{}".format(config["api_listen_address"], config["api_listen_port"]),
"pvcapid.Daemon:create_app()",
"--log-level",
"info",
"--access-logfile",
"-",
"--error-logfile",
"-",
]
if config["api_ssl_enabled"]:
gunicorn_cmd += [
"--certfile",
config["api_ssl_cert_file"],
"--keyfile",
config["api_ssl_key_file"],
]
# Run Gunicorn
try:
subprocess.run(gunicorn_cmd)
except KeyboardInterrupt:
exit(0)
except Exception as e:
print(e)
exit(1)

View File

@ -576,6 +576,63 @@ class API_Status(Resource):
snapshots:
type: integer
description: The total number of snapshots in the storage cluster
resources:
type: object
properties:
memory:
type: object
properties:
total:
type: integer
description: The total amount of RAM (all nodes) in MB
used:
type: integer
description: The total used RAM (all nodes) in MB
free:
type: integer
description: The total free RAM (all nodes) in MB
allocated:
type: integer
description: The total amount of RAM allocated to running domains in MB
provisioned:
type: integer
description: The total amount of RAM provisioned to all domains (regardless of state) in MB
utilization:
type: float
description: The memory utilization percentage (average) of the cluster
vcpu:
type: object
properties:
total:
type: integer
description: The total number of real CPU cores (all nodes)
load:
type: float
description: The current 5-minute CPU load (all nodes summed)
allocated:
type: integer
description: The total number of vCPUs allocated to running domains
provisioned:
type: integer
description: The total number of vCPUs provisioned to all domains (regardless of state)
utilization:
type: float
description: The CPU utilization percentage (average) of the cluster
disk:
type: object
properties:
total:
type: integer
description: The total size of all OSDs in KB
used:
type: integer
description: The total used size of all OSDs in KB
free:
type: integer
description: The total free size of all OSDs in KB
utilization:
type: float
description: The disk utilization percentage (average) of the cluster
400:
description: Bad request
"""
@ -5001,95 +5058,7 @@ class API_Storage_Ceph_Benchmark(Resource):
description: The PVC benchmark format of the results
benchmark_result:
type: object
description: A format 0 test result
properties:
test_name:
type: object
properties:
overall:
type: object
properties:
iosize:
type: string (integer)
description: The total size of the benchmark data
bandwidth:
type: string (integer)
description: The average bandwidth (KiB/s)
iops:
type: string (integer)
description: The average IOPS
runtime:
type: string (integer)
description: The total test time in milliseconds
latency:
type: object
properties:
min:
type: string (integer)
description: The minimum latency measurement
max:
type: string (integer)
description: The maximum latency measurement
mean:
type: string (float)
description: The mean latency measurement
stdev:
type: string (float)
description: The standard deviation of latency
bandwidth:
type: object
properties:
min:
type: string (integer)
description: The minimum bandwidth (KiB/s) measurement
max:
type: string (integer)
description: The maximum bandwidth (KiB/s) measurement
mean:
type: string (float)
description: The mean bandwidth (KiB/s) measurement
stdev:
type: string (float)
description: The standard deviation of bandwidth
numsamples:
type: string (integer)
description: The number of samples taken during the test
iops:
type: object
properties:
min:
type: string (integer)
description: The minimum IOPS measurement
max:
type: string (integer)
description: The maximum IOPS measurement
mean:
type: string (float)
description: The mean IOPS measurement
stdev:
type: string (float)
description: The standard deviation of IOPS
numsamples:
type: string (integer)
description: The number of samples taken during the test
cpu:
type: object
properties:
user:
type: string (float percentage)
description: The percentage of test time spent in user space
system:
type: string (float percentage)
description: The percentage of test time spent in system (kernel) space
ctxsw:
type: string (integer)
description: The number of context switches during the test
majfault:
type: string (integer)
description: The number of major page faults during the test
minfault:
type: string (integer)
description: The number of minor page faults during the test
description: A benchmark test result; format not documented due to complexity
"""
return list_benchmarks(config, reqargs.get("job", None))
@ -5100,6 +5069,10 @@ class API_Storage_Ceph_Benchmark(Resource):
"required": True,
"helptext": "A valid pool must be specified.",
},
{
"name": "name",
"required": False,
},
]
)
@Authenticator
@ -5115,6 +5088,11 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string
required: true
description: The PVC storage pool to benchmark
- in: query
name: name
type: string
required: false
description: An optional override name for the job
responses:
200:
description: OK
@ -5132,7 +5110,10 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400
task = run_celery_task(
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
"storage.benchmark",
pool=reqargs.get("pool", None),
name=reqargs.get("name", None),
run_on="primary",
)
return (
{
@ -6461,6 +6442,41 @@ api.add_resource(
)
# /storage/ceph/volume/<pool>/<volume>/scan
class API_Storage_Ceph_Volume_Element_Scan(Resource):
@Authenticator
def post(self, pool, volume):
"""
Scan a Ceph volume {volume} in pool {pool} for stats (after import)
---
tags:
- storage / ceph
parameters:
responses:
200:
description: OK
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
400:
description: Bad request
schema:
type: object
id: Message
"""
return api_helper.ceph_volume_scan(pool, volume)
api.add_resource(
API_Storage_Ceph_Volume_Element_Scan, "/storage/ceph/volume/<pool>/<volume>/scan"
)
# /storage/ceph/volume/<pool>/<volume>/clone
class API_Storage_Ceph_Volume_Element_Clone(Resource):
@RequestParser(

View File

@ -1996,6 +1996,22 @@ def ceph_volume_list(zkhandler, pool=None, limit=None, is_fuzzy=True):
return retdata, retcode
@ZKConnection(config)
def ceph_volume_scan(zkhandler, pool, name):
"""
(Re)scan a Ceph RBD volume for stats in the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.scan_volume(zkhandler, pool, name)
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
@ZKConnection(config)
def ceph_volume_add(zkhandler, pool, name, size, force_flag=False):
"""

View File

@ -3755,6 +3755,13 @@ def cli_storage_benchmark():
@click.command(name="run", short_help="Run a storage benchmark.")
@connection_req
@click.argument("pool")
@click.option(
"--name",
"name",
default=None,
show_default=False,
help="Use a custom name for the job",
)
@click.option(
"--wait/--no-wait",
"wait_flag",
@ -3766,12 +3773,14 @@ def cli_storage_benchmark():
@confirm_opt(
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
)
def cli_storage_benchmark_run(pool, wait_flag):
def cli_storage_benchmark_run(pool, name, wait_flag):
"""
Run a storage benchmark on POOL in the background.
"""
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag)
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(
CLI_CONFIG, pool, name, wait_flag
)
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)

View File

@ -83,6 +83,37 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
total_volumes = data.get("volumes", 0)
total_snapshots = data.get("snapshots", 0)
total_cpu_total = data.get("resources", {}).get("cpu", {}).get("total", 0)
total_cpu_load = data.get("resources", {}).get("cpu", {}).get("load", 0)
total_cpu_utilization = (
data.get("resources", {}).get("cpu", {}).get("utilization", 0)
)
total_cpu_string = (
f"{total_cpu_utilization:.1f}% ({total_cpu_load:.1f} / {total_cpu_total})"
)
total_memory_total = (
data.get("resources", {}).get("memory", {}).get("total", 0) / 1024
)
total_memory_used = (
data.get("resources", {}).get("memory", {}).get("used", 0) / 1024
)
total_memory_utilization = (
data.get("resources", {}).get("memory", {}).get("utilization", 0)
)
total_memory_string = f"{total_memory_utilization:.1f}% ({total_memory_used:.1f} GB / {total_memory_total:.1f} GB)"
total_disk_total = (
data.get("resources", {}).get("disk", {}).get("total", 0) / 1024 / 1024
)
total_disk_used = (
data.get("resources", {}).get("disk", {}).get("used", 0) / 1024 / 1024
)
total_disk_utilization = round(
data.get("resources", {}).get("disk", {}).get("utilization", 0)
)
total_disk_string = f"{total_disk_utilization:.1f}% ({total_disk_used:.1f} GB / {total_disk_total:.1f} GB)"
if maintenance == "true" or health == -1:
health_colour = ansii["blue"]
elif health > 90:
@ -94,12 +125,9 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
output = list()
output.append(f"{ansii['bold']}PVC cluster status:{ansii['end']}")
output.append("")
output.append(f"{ansii['purple']}Primary node:{ansii['end']} {primary_node}")
output.append(f"{ansii['purple']}PVC version:{ansii['end']} {pvc_version}")
output.append(f"{ansii['purple']}Upstream IP:{ansii['end']} {upstream_ip}")
output.append(f"{ansii['purple']}Primary node:{ansii['end']} {primary_node}")
output.append(f"{ansii['purple']}PVC version:{ansii['end']} {pvc_version}")
output.append(f"{ansii['purple']}Upstream IP:{ansii['end']} {upstream_ip}")
output.append("")
if health != "-1":
@ -111,7 +139,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
health = f"{health} (maintenance on)"
output.append(
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
)
if messages is not None and len(messages) > 0:
@ -136,7 +164,17 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
)
messages = "\n ".join(message_list)
output.append(f"{ansii['purple']}Active Faults:{ansii['end']} {messages}")
else:
messages = "None"
output.append(f"{ansii['purple']}Active faults:{ansii['end']} {messages}")
output.append(f"{ansii['purple']}Total CPU:{ansii['end']} {total_cpu_string}")
output.append(
f"{ansii['purple']}Total memory:{ansii['end']} {total_memory_string}"
)
output.append(f"{ansii['purple']}Total disk:{ansii['end']} {total_disk_string}")
output.append("")
@ -166,7 +204,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
nodes_string = ", ".join(nodes_strings)
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
vm_states = ["start", "disable"]
vm_states.extend(
@ -196,7 +234,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
vms_string = ", ".join(vms_strings)
output.append(f"{ansii['purple']}VMs:{ansii['end']} {vms_string}")
output.append(f"{ansii['purple']}VMs:{ansii['end']} {vms_string}")
osd_states = ["up,in"]
osd_states.extend(
@ -222,15 +260,15 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
osds_string = " ".join(osds_strings)
output.append(f"{ansii['purple']}OSDs:{ansii['end']} {osds_string}")
output.append(f"{ansii['purple']}OSDs:{ansii['end']} {osds_string}")
output.append(f"{ansii['purple']}Pools:{ansii['end']} {total_pools}")
output.append(f"{ansii['purple']}Pools:{ansii['end']} {total_pools}")
output.append(f"{ansii['purple']}Volumes:{ansii['end']} {total_volumes}")
output.append(f"{ansii['purple']}Volumes:{ansii['end']} {total_volumes}")
output.append(f"{ansii['purple']}Snapshots:{ansii['end']} {total_snapshots}")
output.append(f"{ansii['purple']}Snapshots:{ansii['end']} {total_snapshots}")
output.append(f"{ansii['purple']}Networks:{ansii['end']} {total_networks}")
output.append(f"{ansii['purple']}Networks:{ansii['end']} {total_networks}")
output.append("")
@ -258,9 +296,6 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
output = list()
output.append(f"{ansii['bold']}PVC cluster status:{ansii['end']}")
output.append("")
if health != "-1":
health = f"{health}%"
else:
@ -270,7 +305,7 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
health = f"{health} (maintenance on)"
output.append(
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
)
if messages is not None and len(messages) > 0:
@ -295,7 +330,48 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
)
messages = "\n ".join(message_list)
output.append(f"{ansii['purple']}Active Faults:{ansii['end']} {messages}")
else:
messages = "None"
output.append(f"{ansii['purple']}Active faults:{ansii['end']} {messages}")
total_cpu_total = data.get("resources", {}).get("cpu", {}).get("total", 0)
total_cpu_load = data.get("resources", {}).get("cpu", {}).get("load", 0)
total_cpu_utilization = (
data.get("resources", {}).get("cpu", {}).get("utilization", 0)
)
total_cpu_string = (
f"{total_cpu_utilization:.1f}% ({total_cpu_load:.1f} / {total_cpu_total})"
)
total_memory_total = (
data.get("resources", {}).get("memory", {}).get("total", 0) / 1024
)
total_memory_used = (
data.get("resources", {}).get("memory", {}).get("used", 0) / 1024
)
total_memory_utilization = (
data.get("resources", {}).get("memory", {}).get("utilization", 0)
)
total_memory_string = f"{total_memory_utilization:.1f}% ({total_memory_used:.1f} GB / {total_memory_total:.1f} GB)"
total_disk_total = (
data.get("resources", {}).get("disk", {}).get("total", 0) / 1024 / 1024
)
total_disk_used = (
data.get("resources", {}).get("disk", {}).get("used", 0) / 1024 / 1024
)
total_disk_utilization = round(
data.get("resources", {}).get("disk", {}).get("utilization", 0)
)
total_disk_string = f"{total_disk_utilization:.1f}% ({total_disk_used:.1f} GB / {total_disk_total:.1f} GB)"
output.append(f"{ansii['purple']}CPU usage:{ansii['end']} {total_cpu_string}")
output.append(
f"{ansii['purple']}Memory usage:{ansii['end']} {total_memory_string}"
)
output.append(f"{ansii['purple']}Disk usage:{ansii['end']} {total_disk_string}")
output.append("")

View File

@ -30,6 +30,7 @@ from requests_toolbelt.multipart.encoder import (
import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
from pvc.cli.helpers import MAX_CONTENT_WIDTH
#
# Supplemental functions
@ -1724,15 +1725,17 @@ def format_list_snapshot(config, snapshot_list):
#
# Benchmark functions
#
def ceph_benchmark_run(config, pool, wait_flag):
def ceph_benchmark_run(config, pool, name, wait_flag):
"""
Run a storage benchmark against {pool}
API endpoint: POST /api/v1/storage/ceph/benchmark
API arguments: pool={pool}
API arguments: pool={pool}, name={name}
API schema: {message}
"""
params = {"pool": pool}
if name:
params["name"] = name
response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
return get_wait_retdata(response, wait_flag)
@ -1804,7 +1807,7 @@ def get_benchmark_list_results(benchmark_format, benchmark_data):
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy(
benchmark_data
)
elif benchmark_format == 1:
elif benchmark_format == 1 or benchmark_format == 2:
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json(
benchmark_data
)
@ -2006,6 +2009,7 @@ def format_info_benchmark(config, benchmark_information):
benchmark_matrix = {
0: format_info_benchmark_legacy,
1: format_info_benchmark_json,
2: format_info_benchmark_json,
}
benchmark_version = benchmark_information[0]["test_format"]
@ -2340,12 +2344,15 @@ def format_info_benchmark_json(config, benchmark_information):
if benchmark_information["benchmark_result"] == "Running":
return "Benchmark test is still running."
benchmark_format = benchmark_information["test_format"]
benchmark_details = benchmark_information["benchmark_result"]
# Format a nice output; do this line-by-line then concat the elements at the end
ainformation = []
ainformation.append(
"{}Storage Benchmark details:{}".format(ansiprint.bold(), ansiprint.end())
"{}Storage Benchmark details (format {}):{}".format(
ansiprint.bold(), benchmark_format, ansiprint.end()
)
)
nice_test_name_map = {
@ -2393,7 +2400,7 @@ def format_info_benchmark_json(config, benchmark_information):
if element[1] != 0:
useful_latency_tree.append(element)
max_rows = 9
max_rows = 5
if len(useful_latency_tree) > 9:
max_rows = len(useful_latency_tree)
elif len(useful_latency_tree) < 9:
@ -2402,15 +2409,10 @@ def format_info_benchmark_json(config, benchmark_information):
# Format the static data
overall_label = [
"Overall BW/s:",
"Overall IOPS:",
"Total I/O:",
"Runtime (s):",
"User CPU %:",
"System CPU %:",
"Ctx Switches:",
"Major Faults:",
"Minor Faults:",
"BW/s:",
"IOPS:",
"I/O:",
"Time:",
]
while len(overall_label) < max_rows:
overall_label.append("")
@ -2419,68 +2421,149 @@ def format_info_benchmark_json(config, benchmark_information):
format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])),
format_ops_tohuman(int(job_details[io_class]["iops"])),
format_bytes_tohuman(int(job_details[io_class]["io_bytes"])),
job_details["job_runtime"] / 1000,
job_details["usr_cpu"],
job_details["sys_cpu"],
job_details["ctx"],
job_details["majf"],
job_details["minf"],
str(job_details["job_runtime"] / 1000) + "s",
]
while len(overall_data) < max_rows:
overall_data.append("")
cpu_label = [
"Total:",
"User:",
"Sys:",
"OSD:",
"MON:",
]
while len(cpu_label) < max_rows:
cpu_label.append("")
cpu_data = [
(
benchmark_details[test]["avg_cpu_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
round(job_details["usr_cpu"], 2),
round(job_details["sys_cpu"], 2),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(cpu_data) < max_rows:
cpu_data.append("")
memory_label = [
"Total:",
"OSD:",
"MON:",
]
while len(memory_label) < max_rows:
memory_label.append("")
memory_data = [
(
benchmark_details[test]["avg_memory_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(memory_data) < max_rows:
memory_data.append("")
network_label = [
"Total:",
"Sent:",
"Recv:",
]
while len(network_label) < max_rows:
network_label.append("")
network_data = [
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["total"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["sent"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["recv"])
)
if benchmark_format > 1
else "N/A"
),
]
while len(network_data) < max_rows:
network_data.append("")
bandwidth_label = [
"Min:",
"Max:",
"Mean:",
"StdDev:",
"Samples:",
"",
"",
"",
"",
]
while len(bandwidth_label) < max_rows:
bandwidth_label.append("")
bandwidth_data = [
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024),
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024),
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024),
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024),
job_details[io_class]["bw_samples"],
"",
"",
"",
"",
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_min"])),
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_max"])),
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
str(job_details[io_class]["bw_samples"])
+ " / "
+ str(job_details[io_class]["iops_samples"]),
]
while len(bandwidth_data) < max_rows:
bandwidth_data.append("")
iops_data = [
format_ops_tohuman(int(job_details[io_class]["iops_min"])),
format_ops_tohuman(int(job_details[io_class]["iops_max"])),
format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
job_details[io_class]["iops_samples"],
"",
"",
"",
"",
lat_label = [
"Min:",
"Max:",
"Mean:",
"StdDev:",
]
while len(iops_data) < max_rows:
iops_data.append("")
while len(lat_label) < max_rows:
lat_label.append("")
lat_data = [
int(job_details[io_class]["lat_ns"]["min"]) / 1000,
int(job_details[io_class]["lat_ns"]["max"]) / 1000,
int(job_details[io_class]["lat_ns"]["mean"]) / 1000,
int(job_details[io_class]["lat_ns"]["stddev"]) / 1000,
"",
"",
"",
"",
"",
]
while len(lat_data) < max_rows:
lat_data.append("")
@ -2489,98 +2572,119 @@ def format_info_benchmark_json(config, benchmark_information):
lat_bucket_label = list()
lat_bucket_data = list()
for element in useful_latency_tree:
lat_bucket_label.append(element[0])
lat_bucket_data.append(element[1])
lat_bucket_label.append(element[0] + ":" if element[0] else "")
lat_bucket_data.append(round(float(element[1]), 2) if element[1] else "")
while len(lat_bucket_label) < max_rows:
lat_bucket_label.append("")
while len(lat_bucket_data) < max_rows:
lat_bucket_label.append("")
# Column default widths
overall_label_length = 0
overall_label_length = 5
overall_column_length = 0
bandwidth_label_length = 0
bandwidth_column_length = 11
iops_column_length = 4
latency_column_length = 12
cpu_label_length = 6
cpu_column_length = 0
memory_label_length = 6
memory_column_length = 0
network_label_length = 6
network_column_length = 6
bandwidth_label_length = 8
bandwidth_column_length = 0
latency_label_length = 7
latency_column_length = 0
latency_bucket_label_length = 0
latency_bucket_column_length = 0
# Column layout:
# General Bandwidth IOPS Latency Percentiles
# --------- ---------- -------- -------- ---------------
# Size Min Min Min A
# BW Max Max Max B
# IOPS Mean Mean Mean ...
# Runtime StdDev StdDev StdDev Z
# UsrCPU Samples Samples
# SysCPU
# CtxSw
# MajFault
# MinFault
# Overall CPU Memory Network Bandwidth/IOPS Latency Percentiles
# --------- ----- ------- -------- -------------- -------- ---------------
# BW Total Total Total Min Min A
# IOPS Usr OSD Send Max Max B
# Time Sys MON Recv Mean Mean ...
# Size OSD StdDev StdDev Z
# MON Samples
# Set column widths
for item in overall_label:
_item_length = len(str(item))
if _item_length > overall_label_length:
overall_label_length = _item_length
for item in overall_data:
_item_length = len(str(item))
if _item_length > overall_column_length:
overall_column_length = _item_length
test_name_length = len(nice_test_name_map[test])
if test_name_length > overall_label_length + overall_column_length:
_diff = test_name_length - (overall_label_length + overall_column_length)
overall_column_length += _diff
for item in bandwidth_label:
for item in cpu_data:
_item_length = len(str(item))
if _item_length > bandwidth_label_length:
bandwidth_label_length = _item_length
if _item_length > cpu_column_length:
cpu_column_length = _item_length
for item in memory_data:
_item_length = len(str(item))
if _item_length > memory_column_length:
memory_column_length = _item_length
for item in network_data:
_item_length = len(str(item))
if _item_length > network_column_length:
network_column_length = _item_length
for item in bandwidth_data:
_item_length = len(str(item))
if _item_length > bandwidth_column_length:
bandwidth_column_length = _item_length
for item in iops_data:
_item_length = len(str(item))
if _item_length > iops_column_length:
iops_column_length = _item_length
for item in lat_data:
_item_length = len(str(item))
if _item_length > latency_column_length:
latency_column_length = _item_length
for item in lat_bucket_label:
for item in lat_bucket_data:
_item_length = len(str(item))
if _item_length > latency_bucket_label_length:
latency_bucket_label_length = _item_length
if _item_length > latency_bucket_column_length:
latency_bucket_column_length = _item_length
# Top row (Headers)
ainformation.append(
"{bold}\
{overall_label: <{overall_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{iops: <{iops_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket} \
{end_bold}".format(
"{bold}{overall_label: <{overall_label_length}} {header_fill}{end_bold}".format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
overall_label=nice_test_name_map[test],
overall_label_length=overall_label_length,
bandwidth_label="",
bandwidth_label_length=bandwidth_label_length,
bandwidth="Bandwidth/s",
bandwidth_length=bandwidth_column_length,
iops="IOPS",
iops_length=iops_column_length,
latency="Latency (μs)",
latency_length=latency_column_length,
latency_bucket_label="Latency Buckets (μs/%)",
latency_bucket_label_length=latency_bucket_label_length,
latency_bucket="",
header_fill="-"
* (
(MAX_CONTENT_WIDTH if MAX_CONTENT_WIDTH <= 120 else 120)
- len(nice_test_name_map[test])
- 4
),
)
)
ainformation.append(
"{bold}\
{overall_label: <{overall_label_length}} \
{cpu_label: <{cpu_label_length}} \
{memory_label: <{memory_label_length}} \
{network_label: <{network_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{latency_label: <{latency_label_length}} \
{latency_bucket_label: <{latency_bucket_label_length}}\
{end_bold}".format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
overall_label="Overall",
overall_label_length=overall_label_length + overall_column_length + 1,
cpu_label="CPU (%)",
cpu_label_length=cpu_label_length + cpu_column_length + 1,
memory_label="Memory (%)",
memory_label_length=memory_label_length + memory_column_length + 1,
network_label="Network (bps)",
network_label_length=network_label_length + network_column_length + 1,
bandwidth_label="Bandwidth / IOPS",
bandwidth_label_length=bandwidth_label_length
+ bandwidth_column_length
+ 1,
latency_label="Latency (μs)",
latency_label_length=latency_label_length + latency_column_length + 1,
latency_bucket_label="Buckets (μs/%)",
latency_bucket_label_length=latency_bucket_label_length
+ latency_bucket_column_length,
)
)
@ -2588,14 +2692,20 @@ def format_info_benchmark_json(config, benchmark_information):
# Top row (Headers)
ainformation.append(
"{bold}\
{overall_label: >{overall_label_length}} \
{overall: <{overall_length}} \
{bandwidth_label: >{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{iops: <{iops_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: >{latency_bucket_label_length}} \
{latency_bucket} \
{overall_label: <{overall_label_length}} \
{overall: <{overall_length}} \
{cpu_label: <{cpu_label_length}} \
{cpu: <{cpu_length}} \
{memory_label: <{memory_label_length}} \
{memory: <{memory_length}} \
{network_label: <{network_label_length}} \
{network: <{network_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{latency_label: <{latency_label_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket}\
{end_bold}".format(
bold="",
end_bold="",
@ -2603,12 +2713,24 @@ def format_info_benchmark_json(config, benchmark_information):
overall_label_length=overall_label_length,
overall=overall_data[idx],
overall_length=overall_column_length,
cpu_label=cpu_label[idx],
cpu_label_length=cpu_label_length,
cpu=cpu_data[idx],
cpu_length=cpu_column_length,
memory_label=memory_label[idx],
memory_label_length=memory_label_length,
memory=memory_data[idx],
memory_length=memory_column_length,
network_label=network_label[idx],
network_label_length=network_label_length,
network=network_data[idx],
network_length=network_column_length,
bandwidth_label=bandwidth_label[idx],
bandwidth_label_length=bandwidth_label_length,
bandwidth=bandwidth_data[idx],
bandwidth_length=bandwidth_column_length,
iops=iops_data[idx],
iops_length=iops_column_length,
latency_label=lat_label[idx],
latency_label_length=latency_label_length,
latency=lat_data[idx],
latency_length=latency_column_length,
latency_bucket_label=lat_bucket_label[idx],
@ -2617,4 +2739,4 @@ def format_info_benchmark_json(config, benchmark_information):
)
)
return "\n".join(ainformation)
return "\n".join(ainformation) + "\n"

View File

@ -19,31 +19,34 @@
#
###############################################################################
import os
import psutil
import psycopg2
import psycopg2.extras
import subprocess
from datetime import datetime
from json import loads, dumps
from time import sleep
from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph
# Define the current test format
TEST_FORMAT = 1
TEST_FORMAT = 2
# We run a total of 8 tests, to give a generalized idea of performance on the cluster:
# 1. A sequential read test of 8GB with a 4M block size
# 2. A sequential write test of 8GB with a 4M block size
# 3. A random read test of 8GB with a 4M block size
# 4. A random write test of 8GB with a 4M block size
# 5. A random read test of 8GB with a 256k block size
# 6. A random write test of 8GB with a 256k block size
# 7. A random read test of 8GB with a 4k block size
# 8. A random write test of 8GB with a 4k block size
# 1. A sequential read test of 64GB with a 4M block size
# 2. A sequential write test of 64GB with a 4M block size
# 3. A random read test of 64GB with a 4M block size
# 4. A random write test of 64GB with a 4M block size
# 5. A random read test of 64GB with a 256k block size
# 6. A random write test of 64GB with a 256k block size
# 7. A random read test of 64GB with a 4k block size
# 8. A random write test of 64GB with a 4k block size
# Taken together, these 8 results should give a very good indication of the overall storage performance
# for a variety of workloads.
test_matrix = {
@ -100,7 +103,7 @@ test_matrix = {
# Specify the benchmark volume name and size
benchmark_volume_name = "pvcbenchmark"
benchmark_volume_size = "8G"
benchmark_volume_size = "64G"
#
@ -226,7 +229,7 @@ def cleanup_benchmark_volume(
def run_benchmark_job(
test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
config, test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
):
test_spec = test_matrix[test]
log_info(None, f"Running test '{test}'")
@ -256,31 +259,165 @@ def run_benchmark_job(
)
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
# Run the fio command manually instead of using our run_os_command wrapper
# This will help us gather statistics about this node while it's running
process = subprocess.Popen(
fio_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait 15 seconds for the test to start
log_info(None, "Waiting 15 seconds for test resource stabilization")
sleep(15)
# Set up function to get process CPU utilization by name
def get_cpu_utilization_by_name(process_name):
cpu_usage = 0
for proc in psutil.process_iter(["name", "cpu_percent"]):
if proc.info["name"] == process_name:
cpu_usage += proc.info["cpu_percent"]
return cpu_usage
# Set up function to get process memory utilization by name
def get_memory_utilization_by_name(process_name):
memory_usage = 0
for proc in psutil.process_iter(["name", "memory_percent"]):
if proc.info["name"] == process_name:
memory_usage += proc.info["memory_percent"]
return memory_usage
# Set up function to get network traffic utilization in bps
def get_network_traffic_bps(interface, duration=1):
# Get initial network counters
net_io_start = psutil.net_io_counters(pernic=True)
if interface not in net_io_start:
return None, None
stats_start = net_io_start[interface]
bytes_sent_start = stats_start.bytes_sent
bytes_recv_start = stats_start.bytes_recv
# Wait for the specified duration
sleep(duration)
# Get final network counters
net_io_end = psutil.net_io_counters(pernic=True)
stats_end = net_io_end[interface]
bytes_sent_end = stats_end.bytes_sent
bytes_recv_end = stats_end.bytes_recv
# Calculate bytes per second
bytes_sent_per_sec = (bytes_sent_end - bytes_sent_start) / duration
bytes_recv_per_sec = (bytes_recv_end - bytes_recv_start) / duration
# Convert to bits per second (bps)
bits_sent_per_sec = bytes_sent_per_sec * 8
bits_recv_per_sec = bytes_recv_per_sec * 8
bits_total_per_sec = bits_sent_per_sec + bits_recv_per_sec
return bits_sent_per_sec, bits_recv_per_sec, bits_total_per_sec
log_info(None, f"Starting system resource polling for test '{test}'")
storage_interface = config["storage_dev"]
total_cpus = psutil.cpu_count(logical=True)
ticks = 1
osd_cpu_utilization = 0
osd_memory_utilization = 0
mon_cpu_utilization = 0
mon_memory_utilization = 0
total_cpu_utilization = 0
total_memory_utilization = 0
storage_sent_bps = 0
storage_recv_bps = 0
storage_total_bps = 0
while process.poll() is None:
# Do collection of statistics like network bandwidth and cpu utilization
current_osd_cpu_utilization = get_cpu_utilization_by_name("ceph-osd")
current_osd_memory_utilization = get_memory_utilization_by_name("ceph-osd")
current_mon_cpu_utilization = get_cpu_utilization_by_name("ceph-mon")
current_mon_memory_utilization = get_memory_utilization_by_name("ceph-mon")
current_total_cpu_utilization = psutil.cpu_percent(interval=1)
current_total_memory_utilization = psutil.virtual_memory().percent
(
current_storage_sent_bps,
current_storage_recv_bps,
current_storage_total_bps,
) = get_network_traffic_bps(storage_interface)
# Recheck if the process is done yet; if it's not, we add the values and increase the ticks
# This helps ensure that if the process finishes earlier than the longer polls above,
# this particular tick isn't counted which can skew the average
if process.poll() is None:
osd_cpu_utilization += current_osd_cpu_utilization
osd_memory_utilization += current_osd_memory_utilization
mon_cpu_utilization += current_mon_cpu_utilization
mon_memory_utilization += current_mon_memory_utilization
total_cpu_utilization += current_total_cpu_utilization
total_memory_utilization += current_total_memory_utilization
storage_sent_bps += current_storage_sent_bps
storage_recv_bps += current_storage_recv_bps
storage_total_bps += current_storage_total_bps
ticks += 1
# Get the 1-minute load average and CPU utilization, which covers the test duration
load1, _, _ = os.getloadavg()
load1 = round(load1, 2)
# Calculate the average CPU utilization values over the runtime
# Divide the OSD and MON CPU utilization by the total number of CPU cores, because
# the total is divided this way
avg_osd_cpu_utilization = round(osd_cpu_utilization / ticks / total_cpus, 2)
avg_osd_memory_utilization = round(osd_memory_utilization / ticks, 2)
avg_mon_cpu_utilization = round(mon_cpu_utilization / ticks / total_cpus, 2)
avg_mon_memory_utilization = round(mon_memory_utilization / ticks, 2)
avg_total_cpu_utilization = round(total_cpu_utilization / ticks, 2)
avg_total_memory_utilization = round(total_memory_utilization / ticks, 2)
avg_storage_sent_bps = round(storage_sent_bps / ticks, 2)
avg_storage_recv_bps = round(storage_recv_bps / ticks, 2)
avg_storage_total_bps = round(storage_total_bps / ticks, 2)
stdout, stderr = process.communicate()
retcode = process.returncode
resource_data = {
"avg_cpu_util_percent": {
"total": avg_total_cpu_utilization,
"ceph-mon": avg_mon_cpu_utilization,
"ceph-osd": avg_osd_cpu_utilization,
},
"avg_memory_util_percent": {
"total": avg_total_memory_utilization,
"ceph-mon": avg_mon_memory_utilization,
"ceph-osd": avg_osd_memory_utilization,
},
"avg_network_util_bps": {
"sent": avg_storage_sent_bps,
"recv": avg_storage_recv_bps,
"total": avg_storage_total_bps,
},
}
try:
jstdout = loads(stdout)
if retcode:
raise
except Exception:
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}': {stderr}",
)
return None, None
return jstdout
return resource_data, jstdout
def worker_run_benchmark(zkhandler, celery, config, pool):
def worker_run_benchmark(zkhandler, celery, config, pool, name):
# Phase 0 - connect to databases
cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}"
if not name:
cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}"
else:
job_name = name
current_stage = 0
total_stages = 13
@ -358,7 +495,8 @@ def worker_run_benchmark(zkhandler, celery, config, pool):
total=total_stages,
)
results[test] = run_benchmark_job(
resource_data, fio_data = run_benchmark_job(
config,
test,
pool,
job_name=job_name,
@ -366,6 +504,25 @@ def worker_run_benchmark(zkhandler, celery, config, pool):
db_cur=db_cur,
zkhandler=zkhandler,
)
if resource_data is None or fio_data is None:
cleanup_benchmark_volume(
pool,
job_name=job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}'",
)
results[test] = {**resource_data, **fio_data}
# Phase 3 - cleanup
current_stage += 1

View File

@ -560,7 +560,21 @@ def getVolumeInformation(zkhandler, pool, volume):
return volume_information
def add_volume(zkhandler, pool, name, size, force_flag=False):
def scan_volume(zkhandler, pool, name):
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume.stats", f"{pool}/{name}"), volstats),
]
)
def add_volume(zkhandler, pool, name, size, force_flag=False, zk_only=False):
# 1. Verify the size of the volume
pool_information = getPoolInformation(zkhandler, pool)
size_bytes = format_bytes_fromhuman(size)
@ -592,27 +606,28 @@ def add_volume(zkhandler, pool, name, size, force_flag=False):
)
# 2. Create the volume
retcode, stdout, stderr = common.run_os_command(
"rbd create --size {}B {}/{}".format(size_bytes, pool, name)
)
if retcode:
return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(name, stderr)
# 2. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# zk_only flag skips actually creating the volume - this would be done by some other mechanism
if not zk_only:
retcode, stdout, stderr = common.run_os_command(
"rbd create --size {}B {}/{}".format(size_bytes, pool, name)
)
if retcode:
return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(
name, stderr
)
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name}"), ""),
(("volume.stats", f"{pool}/{name}"), volstats),
(("volume.stats", f"{pool}/{name}"), ""),
(("snapshot", f"{pool}/{name}"), ""),
]
)
# 4. Scan the volume stats
scan_volume(zkhandler, pool, name)
return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format(
name, format_bytes_tohuman(size_bytes), pool
)
@ -662,21 +677,18 @@ def clone_volume(zkhandler, pool, name_src, name_new, force_flag=False):
),
)
# 3. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name_new)
)
volstats = stdout
# 4. Add the new volume to Zookeeper
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name_new}"), ""),
(("volume.stats", f"{pool}/{name_new}"), volstats),
(("volume.stats", f"{pool}/{name_new}"), ""),
(("snapshot", f"{pool}/{name_new}"), ""),
]
)
# 4. Scan the volume stats
scan_volume(zkhandler, pool, name_new)
return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format(
name_src, name_new, pool
)
@ -761,20 +773,8 @@ def resize_volume(zkhandler, pool, name, size, force_flag=False):
except Exception:
pass
# 4. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# 5. Update the volume in Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name}"), ""),
(("volume.stats", f"{pool}/{name}"), volstats),
(("snapshot", f"{pool}/{name}"), ""),
]
)
# 4. Scan the volume stats
scan_volume(zkhandler, pool, name)
return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format(
name, format_bytes_tohuman(size_bytes), pool
@ -807,18 +807,8 @@ def rename_volume(zkhandler, pool, name, new_name):
]
)
# 3. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, new_name)
)
volstats = stdout
# 4. Update the volume stats in Zookeeper
zkhandler.write(
[
(("volume.stats", f"{pool}/{new_name}"), volstats),
]
)
# 3. Scan the volume stats
scan_volume(zkhandler, pool, new_name)
return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format(
name, new_name, pool

View File

@ -262,6 +262,22 @@ def getClusterInformation(zkhandler):
# Get cluster maintenance state
maintenance_state = zkhandler.read("base.config.maintenance")
# Prepare cluster total values
cluster_total_node_memory = 0
cluster_total_used_memory = 0
cluster_total_free_memory = 0
cluster_total_allocated_memory = 0
cluster_total_provisioned_memory = 0
cluster_total_average_memory_utilization = 0
cluster_total_cpu_cores = 0
cluster_total_cpu_load = 0
cluster_total_average_cpu_utilization = 0
cluster_total_allocated_cores = 0
cluster_total_osd_space = 0
cluster_total_used_space = 0
cluster_total_free_space = 0
cluster_total_average_osd_utilization = 0
# Get primary node
maintenance_state, primary_node = zkhandler.read_many(
[
@ -276,19 +292,36 @@ def getClusterInformation(zkhandler):
# Get the list of Nodes
node_list = zkhandler.children("base.node")
node_count = len(node_list)
# Get the daemon and domain states of all Nodes
# Get the information of all Nodes
node_state_reads = list()
node_memory_reads = list()
node_cpu_reads = list()
for node in node_list:
node_state_reads += [
("node.state.daemon", node),
("node.state.domain", node),
]
node_memory_reads += [
("node.memory.total", node),
("node.memory.used", node),
("node.memory.free", node),
("node.memory.allocated", node),
("node.memory.provisioned", node),
]
node_cpu_reads += [
("node.data.static", node),
("node.vcpu.allocated", node),
("node.cpu.load", node),
]
all_node_states = zkhandler.read_many(node_state_reads)
all_node_memory = zkhandler.read_many(node_memory_reads)
all_node_cpu = zkhandler.read_many(node_cpu_reads)
# Parse out the Node states
node_data = list()
formatted_node_states = {"total": node_count}
for nidx, node in enumerate(node_list):
# Split the large list of return values by the IDX of this node
# Split the large list of return values by the IDX of this node (states)
# Each node result is 2 fields long
pos_start = nidx * 2
pos_end = nidx * 2 + 2
@ -308,6 +341,46 @@ def getClusterInformation(zkhandler):
else:
formatted_node_states[node_state] = 1
# Split the large list of return values by the IDX of this node (memory)
# Each node result is 5 fields long
pos_start = nidx * 5
pos_end = nidx * 5 + 5
(
node_memory_total,
node_memory_used,
node_memory_free,
node_memory_allocated,
node_memory_provisioned,
) = tuple(all_node_memory[pos_start:pos_end])
cluster_total_node_memory += int(node_memory_total)
cluster_total_used_memory += int(node_memory_used)
cluster_total_free_memory += int(node_memory_free)
cluster_total_allocated_memory += int(node_memory_allocated)
cluster_total_provisioned_memory += int(node_memory_provisioned)
# Split the large list of return values by the IDX of this node (cpu)
# Each nod result is 3 fields long
pos_start = nidx * 3
pos_end = nidx * 3 + 3
node_static_data, node_vcpu_allocated, node_cpu_load = tuple(
all_node_cpu[pos_start:pos_end]
)
cluster_total_cpu_cores += int(node_static_data.split()[0])
cluster_total_cpu_load += round(float(node_cpu_load), 2)
cluster_total_allocated_cores += int(node_vcpu_allocated)
cluster_total_average_memory_utilization = (
(round((cluster_total_used_memory / cluster_total_node_memory) * 100, 2))
if cluster_total_node_memory > 0
else 0.00
)
cluster_total_average_cpu_utilization = (
(round((cluster_total_cpu_load / cluster_total_cpu_cores) * 100, 2))
if cluster_total_cpu_cores > 0
else 0.00
)
# Get the list of VMs
vm_list = zkhandler.children("base.domain")
vm_count = len(vm_list)
@ -380,6 +453,18 @@ def getClusterInformation(zkhandler):
else:
formatted_osd_states[osd_state] = 1
# Add the OSD utilization
cluster_total_osd_space += int(osd_stats["kb"])
cluster_total_used_space += int(osd_stats["kb_used"])
cluster_total_free_space += int(osd_stats["kb_avail"])
cluster_total_average_osd_utilization += float(osd_stats["utilization"])
cluster_total_average_osd_utilization = (
(round(cluster_total_average_osd_utilization / len(ceph_osd_list), 2))
if ceph_osd_list
else 0.00
)
# Get the list of Networks
network_list = zkhandler.children("base.network")
network_count = len(network_list)
@ -424,6 +509,28 @@ def getClusterInformation(zkhandler):
"pools": ceph_pool_count,
"volumes": ceph_volume_count,
"snapshots": ceph_snapshot_count,
"resources": {
"memory": {
"total": cluster_total_node_memory,
"free": cluster_total_free_memory,
"used": cluster_total_used_memory,
"allocated": cluster_total_allocated_memory,
"provisioned": cluster_total_provisioned_memory,
"utilization": cluster_total_average_memory_utilization,
},
"cpu": {
"total": cluster_total_cpu_cores,
"load": cluster_total_cpu_load,
"allocated": cluster_total_allocated_cores,
"utilization": cluster_total_average_cpu_utilization,
},
"disk": {
"total": cluster_total_osd_space,
"used": cluster_total_used_space,
"free": cluster_total_free_space,
"utilization": cluster_total_average_osd_utilization,
},
},
"detail": {
"node": node_data,
"vm": vm_data,

2
debian/control vendored
View File

@ -32,7 +32,7 @@ Description: Parallel Virtual Cluster worker daemon
Package: pvc-daemon-api
Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Depends: systemd, pvc-daemon-common, gunicorn, python3-gunicorn, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.

View File

@ -96,12 +96,12 @@ def create_vm(
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, run_on="primary"):
def storage_benchmark(self, pool=None, name=None, run_on="primary"):
@ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool)
def run_storage_benchmark(zkhandler, self, pool, name):
return worker_run_benchmark(zkhandler, self, config, pool, name)
return run_storage_benchmark(self, pool)
return run_storage_benchmark(self, pool, name)
@celery.task(name="cluster.autobackup", bind=True, routing_key="run_on")