Compare commits

..

4 Commits

Author SHA1 Message Date
Joshua Boniface 41cd34ba4d Allow specifying job names for benchmarks 2024-09-18 14:55:12 -04:00
Joshua Boniface 736762901c Update benchmarks to include resource utilization
Adds additional polled information on node cpu, memory, and network
bandwidth for the node running the test. This should provide additional
useful information about the results of the test.

Also bumps the test format to 2 to ensure clients can handle the changes
properly.
2024-09-18 14:32:03 -04:00
Joshua Boniface ecb812ccac Update linting for pvcapid recent changes 2024-09-18 10:18:50 -04:00
Joshua Boniface a2e5df9f6d Add support for Gunicorn execution
Modifies pvcapid to run under Gunicorn when in non-debug mode, instead
of the Flask development server. This is proper practice for one, and
also helps increase performance slightly in some workloads (file uploads
mainly).
2024-09-09 13:20:03 -04:00
8 changed files with 541 additions and 271 deletions

View File

@ -19,6 +19,13 @@
# #
############################################################################### ###############################################################################
import pvcapid.Daemon # noqa: F401 import sys
from os import path
# Ensure current directory (/usr/share/pvc) is in the system path for Gunicorn
current_dir = path.dirname(path.abspath(__file__))
sys.path.append(current_dir)
import pvcapid.Daemon # noqa: F401, E402
pvcapid.Daemon.entrypoint() pvcapid.Daemon.entrypoint()

View File

@ -19,15 +19,13 @@
# #
############################################################################### ###############################################################################
import subprocess
from ssl import SSLContext, TLSVersion from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg import daemon_lib.config as cfg
# Daemon version # Daemon version
version = "0.9.100" version = "0.9.100~git-73c0834f"
# API version # API version
API_VERSION = 1.0 API_VERSION = 1.0
@ -53,7 +51,6 @@ def strtobool(stringv):
# Configuration Parsing # Configuration Parsing
########################################################## ##########################################################
# Get our configuration # Get our configuration
config = cfg.get_configuration() config = cfg.get_configuration()
config["daemon_name"] = "pvcapid" config["daemon_name"] = "pvcapid"
@ -61,22 +58,16 @@ config["daemon_version"] = version
########################################################## ##########################################################
# Entrypoint # Flask App Creation for Gunicorn
########################################################## ##########################################################
def entrypoint(): def create_app():
import pvcapid.flaskapi as pvc_api # noqa: E402 """
Create and return the Flask app and SSL context if necessary.
if config["api_ssl_enabled"]: """
context = SSLContext() # Import the Flask app from pvcapid.flaskapi after adjusting the path
context.minimum_version = TLSVersion.TLSv1 import pvcapid.flaskapi as pvc_api
context.get_ca_certs()
context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
context = None
# Print our startup messages # Print our startup messages
print("") print("")
@ -102,9 +93,69 @@ def entrypoint():
print("") print("")
pvc_api.celery_startup() pvc_api.celery_startup()
pvc_api.app.run(
return pvc_api.app
##########################################################
# Entrypoint
##########################################################
def entrypoint():
if config["debug"]:
app = create_app()
if config["api_ssl_enabled"]:
ssl_context = SSLContext()
ssl_context.minimum_version = TLSVersion.TLSv1
ssl_context.get_ca_certs()
ssl_context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
ssl_context = None
app.run(
config["api_listen_address"], config["api_listen_address"],
config["api_listen_port"], config["api_listen_port"],
threaded=True, threaded=True,
ssl_context=context, ssl_context=ssl_context,
) )
else:
# Build the command to run Gunicorn
gunicorn_cmd = [
"gunicorn",
"--workers",
"1",
"--threads",
"8",
"--timeout",
"86400",
"--bind",
"{}:{}".format(config["api_listen_address"], config["api_listen_port"]),
"pvcapid.Daemon:create_app()",
"--log-level",
"info",
"--access-logfile",
"-",
"--error-logfile",
"-",
]
if config["api_ssl_enabled"]:
gunicorn_cmd += [
"--certfile",
config["api_ssl_cert_file"],
"--keyfile",
config["api_ssl_key_file"],
]
# Run Gunicorn
try:
subprocess.run(gunicorn_cmd)
except KeyboardInterrupt:
exit(0)
except Exception as e:
print(e)
exit(1)

View File

@ -5058,95 +5058,7 @@ class API_Storage_Ceph_Benchmark(Resource):
description: The PVC benchmark format of the results description: The PVC benchmark format of the results
benchmark_result: benchmark_result:
type: object type: object
description: A format 0 test result description: A benchmark test result; format not documented due to complexity
properties:
test_name:
type: object
properties:
overall:
type: object
properties:
iosize:
type: string (integer)
description: The total size of the benchmark data
bandwidth:
type: string (integer)
description: The average bandwidth (KiB/s)
iops:
type: string (integer)
description: The average IOPS
runtime:
type: string (integer)
description: The total test time in milliseconds
latency:
type: object
properties:
min:
type: string (integer)
description: The minimum latency measurement
max:
type: string (integer)
description: The maximum latency measurement
mean:
type: string (float)
description: The mean latency measurement
stdev:
type: string (float)
description: The standard deviation of latency
bandwidth:
type: object
properties:
min:
type: string (integer)
description: The minimum bandwidth (KiB/s) measurement
max:
type: string (integer)
description: The maximum bandwidth (KiB/s) measurement
mean:
type: string (float)
description: The mean bandwidth (KiB/s) measurement
stdev:
type: string (float)
description: The standard deviation of bandwidth
numsamples:
type: string (integer)
description: The number of samples taken during the test
iops:
type: object
properties:
min:
type: string (integer)
description: The minimum IOPS measurement
max:
type: string (integer)
description: The maximum IOPS measurement
mean:
type: string (float)
description: The mean IOPS measurement
stdev:
type: string (float)
description: The standard deviation of IOPS
numsamples:
type: string (integer)
description: The number of samples taken during the test
cpu:
type: object
properties:
user:
type: string (float percentage)
description: The percentage of test time spent in user space
system:
type: string (float percentage)
description: The percentage of test time spent in system (kernel) space
ctxsw:
type: string (integer)
description: The number of context switches during the test
majfault:
type: string (integer)
description: The number of major page faults during the test
minfault:
type: string (integer)
description: The number of minor page faults during the test
""" """
return list_benchmarks(config, reqargs.get("job", None)) return list_benchmarks(config, reqargs.get("job", None))
@ -5157,6 +5069,10 @@ class API_Storage_Ceph_Benchmark(Resource):
"required": True, "required": True,
"helptext": "A valid pool must be specified.", "helptext": "A valid pool must be specified.",
}, },
{
"name": "name",
"required": False,
},
] ]
) )
@Authenticator @Authenticator
@ -5172,6 +5088,11 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string type: string
required: true required: true
description: The PVC storage pool to benchmark description: The PVC storage pool to benchmark
- in: query
name: name
type: string
required: false
description: An optional override name for the job
responses: responses:
200: 200:
description: OK description: OK
@ -5189,7 +5110,10 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400 }, 400
task = run_celery_task( task = run_celery_task(
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary" "storage.benchmark",
pool=reqargs.get("pool", None),
name=reqargs.get("name", None),
run_on="primary",
) )
return ( return (
{ {

View File

@ -3755,6 +3755,13 @@ def cli_storage_benchmark():
@click.command(name="run", short_help="Run a storage benchmark.") @click.command(name="run", short_help="Run a storage benchmark.")
@connection_req @connection_req
@click.argument("pool") @click.argument("pool")
@click.option(
"--name",
"name",
default=None,
show_default=False,
help="Use a custom name for the job",
)
@click.option( @click.option(
"--wait/--no-wait", "--wait/--no-wait",
"wait_flag", "wait_flag",
@ -3766,12 +3773,14 @@ def cli_storage_benchmark():
@confirm_opt( @confirm_opt(
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue" "Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
) )
def cli_storage_benchmark_run(pool, wait_flag): def cli_storage_benchmark_run(pool, name, wait_flag):
""" """
Run a storage benchmark on POOL in the background. Run a storage benchmark on POOL in the background.
""" """
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag) retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(
CLI_CONFIG, pool, name, wait_flag
)
if retcode and wait_flag: if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)

View File

@ -30,6 +30,7 @@ from requests_toolbelt.multipart.encoder import (
import pvc.lib.ansiprint as ansiprint import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
from pvc.cli.helpers import MAX_CONTENT_WIDTH
# #
# Supplemental functions # Supplemental functions
@ -1724,15 +1725,17 @@ def format_list_snapshot(config, snapshot_list):
# #
# Benchmark functions # Benchmark functions
# #
def ceph_benchmark_run(config, pool, wait_flag): def ceph_benchmark_run(config, pool, name, wait_flag):
""" """
Run a storage benchmark against {pool} Run a storage benchmark against {pool}
API endpoint: POST /api/v1/storage/ceph/benchmark API endpoint: POST /api/v1/storage/ceph/benchmark
API arguments: pool={pool} API arguments: pool={pool}, name={name}
API schema: {message} API schema: {message}
""" """
params = {"pool": pool} params = {"pool": pool}
if name:
params["name"] = name
response = call_api(config, "post", "/storage/ceph/benchmark", params=params) response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
return get_wait_retdata(response, wait_flag) return get_wait_retdata(response, wait_flag)
@ -1804,7 +1807,7 @@ def get_benchmark_list_results(benchmark_format, benchmark_data):
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy( benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy(
benchmark_data benchmark_data
) )
elif benchmark_format == 1: elif benchmark_format == 1 or benchmark_format == 2:
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json( benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json(
benchmark_data benchmark_data
) )
@ -2006,6 +2009,7 @@ def format_info_benchmark(config, benchmark_information):
benchmark_matrix = { benchmark_matrix = {
0: format_info_benchmark_legacy, 0: format_info_benchmark_legacy,
1: format_info_benchmark_json, 1: format_info_benchmark_json,
2: format_info_benchmark_json,
} }
benchmark_version = benchmark_information[0]["test_format"] benchmark_version = benchmark_information[0]["test_format"]
@ -2340,12 +2344,15 @@ def format_info_benchmark_json(config, benchmark_information):
if benchmark_information["benchmark_result"] == "Running": if benchmark_information["benchmark_result"] == "Running":
return "Benchmark test is still running." return "Benchmark test is still running."
benchmark_format = benchmark_information["test_format"]
benchmark_details = benchmark_information["benchmark_result"] benchmark_details = benchmark_information["benchmark_result"]
# Format a nice output; do this line-by-line then concat the elements at the end # Format a nice output; do this line-by-line then concat the elements at the end
ainformation = [] ainformation = []
ainformation.append( ainformation.append(
"{}Storage Benchmark details:{}".format(ansiprint.bold(), ansiprint.end()) "{}Storage Benchmark details (format {}):{}".format(
ansiprint.bold(), benchmark_format, ansiprint.end()
)
) )
nice_test_name_map = { nice_test_name_map = {
@ -2393,7 +2400,7 @@ def format_info_benchmark_json(config, benchmark_information):
if element[1] != 0: if element[1] != 0:
useful_latency_tree.append(element) useful_latency_tree.append(element)
max_rows = 9 max_rows = 5
if len(useful_latency_tree) > 9: if len(useful_latency_tree) > 9:
max_rows = len(useful_latency_tree) max_rows = len(useful_latency_tree)
elif len(useful_latency_tree) < 9: elif len(useful_latency_tree) < 9:
@ -2402,15 +2409,10 @@ def format_info_benchmark_json(config, benchmark_information):
# Format the static data # Format the static data
overall_label = [ overall_label = [
"Overall BW/s:", "BW/s:",
"Overall IOPS:", "IOPS:",
"Total I/O:", "I/O:",
"Runtime (s):", "Time:",
"User CPU %:",
"System CPU %:",
"Ctx Switches:",
"Major Faults:",
"Minor Faults:",
] ]
while len(overall_label) < max_rows: while len(overall_label) < max_rows:
overall_label.append("") overall_label.append("")
@ -2419,68 +2421,149 @@ def format_info_benchmark_json(config, benchmark_information):
format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])), format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])),
format_ops_tohuman(int(job_details[io_class]["iops"])), format_ops_tohuman(int(job_details[io_class]["iops"])),
format_bytes_tohuman(int(job_details[io_class]["io_bytes"])), format_bytes_tohuman(int(job_details[io_class]["io_bytes"])),
job_details["job_runtime"] / 1000, str(job_details["job_runtime"] / 1000) + "s",
job_details["usr_cpu"],
job_details["sys_cpu"],
job_details["ctx"],
job_details["majf"],
job_details["minf"],
] ]
while len(overall_data) < max_rows: while len(overall_data) < max_rows:
overall_data.append("") overall_data.append("")
cpu_label = [
"Total:",
"User:",
"Sys:",
"OSD:",
"MON:",
]
while len(cpu_label) < max_rows:
cpu_label.append("")
cpu_data = [
(
benchmark_details[test]["avg_cpu_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
round(job_details["usr_cpu"], 2),
round(job_details["sys_cpu"], 2),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(cpu_data) < max_rows:
cpu_data.append("")
memory_label = [
"Total:",
"OSD:",
"MON:",
]
while len(memory_label) < max_rows:
memory_label.append("")
memory_data = [
(
benchmark_details[test]["avg_memory_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(memory_data) < max_rows:
memory_data.append("")
network_label = [
"Total:",
"Sent:",
"Recv:",
]
while len(network_label) < max_rows:
network_label.append("")
network_data = [
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["total"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["sent"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["recv"])
)
if benchmark_format > 1
else "N/A"
),
]
while len(network_data) < max_rows:
network_data.append("")
bandwidth_label = [ bandwidth_label = [
"Min:", "Min:",
"Max:", "Max:",
"Mean:", "Mean:",
"StdDev:", "StdDev:",
"Samples:", "Samples:",
"",
"",
"",
"",
] ]
while len(bandwidth_label) < max_rows: while len(bandwidth_label) < max_rows:
bandwidth_label.append("") bandwidth_label.append("")
bandwidth_data = [ bandwidth_data = [
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024), format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024)
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024), + " / "
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024), + format_ops_tohuman(int(job_details[io_class]["iops_min"])),
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024), format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024)
job_details[io_class]["bw_samples"], + " / "
"", + format_ops_tohuman(int(job_details[io_class]["iops_max"])),
"", format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024)
"", + " / "
"", + format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
str(job_details[io_class]["bw_samples"])
+ " / "
+ str(job_details[io_class]["iops_samples"]),
] ]
while len(bandwidth_data) < max_rows: while len(bandwidth_data) < max_rows:
bandwidth_data.append("") bandwidth_data.append("")
iops_data = [ lat_label = [
format_ops_tohuman(int(job_details[io_class]["iops_min"])), "Min:",
format_ops_tohuman(int(job_details[io_class]["iops_max"])), "Max:",
format_ops_tohuman(int(job_details[io_class]["iops_mean"])), "Mean:",
format_ops_tohuman(int(job_details[io_class]["iops_stddev"])), "StdDev:",
job_details[io_class]["iops_samples"],
"",
"",
"",
"",
] ]
while len(iops_data) < max_rows: while len(lat_label) < max_rows:
iops_data.append("") lat_label.append("")
lat_data = [ lat_data = [
int(job_details[io_class]["lat_ns"]["min"]) / 1000, int(job_details[io_class]["lat_ns"]["min"]) / 1000,
int(job_details[io_class]["lat_ns"]["max"]) / 1000, int(job_details[io_class]["lat_ns"]["max"]) / 1000,
int(job_details[io_class]["lat_ns"]["mean"]) / 1000, int(job_details[io_class]["lat_ns"]["mean"]) / 1000,
int(job_details[io_class]["lat_ns"]["stddev"]) / 1000, int(job_details[io_class]["lat_ns"]["stddev"]) / 1000,
"",
"",
"",
"",
"",
] ]
while len(lat_data) < max_rows: while len(lat_data) < max_rows:
lat_data.append("") lat_data.append("")
@ -2489,98 +2572,119 @@ def format_info_benchmark_json(config, benchmark_information):
lat_bucket_label = list() lat_bucket_label = list()
lat_bucket_data = list() lat_bucket_data = list()
for element in useful_latency_tree: for element in useful_latency_tree:
lat_bucket_label.append(element[0]) lat_bucket_label.append(element[0] + ":" if element[0] else "")
lat_bucket_data.append(element[1]) lat_bucket_data.append(round(float(element[1]), 2) if element[1] else "")
while len(lat_bucket_label) < max_rows:
lat_bucket_label.append("")
while len(lat_bucket_data) < max_rows:
lat_bucket_label.append("")
# Column default widths # Column default widths
overall_label_length = 0 overall_label_length = 5
overall_column_length = 0 overall_column_length = 0
bandwidth_label_length = 0 cpu_label_length = 6
bandwidth_column_length = 11 cpu_column_length = 0
iops_column_length = 4 memory_label_length = 6
latency_column_length = 12 memory_column_length = 0
network_label_length = 6
network_column_length = 6
bandwidth_label_length = 8
bandwidth_column_length = 0
latency_label_length = 7
latency_column_length = 0
latency_bucket_label_length = 0 latency_bucket_label_length = 0
latency_bucket_column_length = 0
# Column layout: # Column layout:
# General Bandwidth IOPS Latency Percentiles # Overall CPU Memory Network Bandwidth/IOPS Latency Percentiles
# --------- ---------- -------- -------- --------------- # --------- ----- ------- -------- -------------- -------- ---------------
# Size Min Min Min A # BW Total Total Total Min Min A
# BW Max Max Max B # IOPS Usr OSD Send Max Max B
# IOPS Mean Mean Mean ... # Time Sys MON Recv Mean Mean ...
# Runtime StdDev StdDev StdDev Z # Size OSD StdDev StdDev Z
# UsrCPU Samples Samples # MON Samples
# SysCPU
# CtxSw
# MajFault
# MinFault
# Set column widths # Set column widths
for item in overall_label:
_item_length = len(str(item))
if _item_length > overall_label_length:
overall_label_length = _item_length
for item in overall_data: for item in overall_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > overall_column_length: if _item_length > overall_column_length:
overall_column_length = _item_length overall_column_length = _item_length
test_name_length = len(nice_test_name_map[test]) for item in cpu_data:
if test_name_length > overall_label_length + overall_column_length:
_diff = test_name_length - (overall_label_length + overall_column_length)
overall_column_length += _diff
for item in bandwidth_label:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > bandwidth_label_length: if _item_length > cpu_column_length:
bandwidth_label_length = _item_length cpu_column_length = _item_length
for item in memory_data:
_item_length = len(str(item))
if _item_length > memory_column_length:
memory_column_length = _item_length
for item in network_data:
_item_length = len(str(item))
if _item_length > network_column_length:
network_column_length = _item_length
for item in bandwidth_data: for item in bandwidth_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > bandwidth_column_length: if _item_length > bandwidth_column_length:
bandwidth_column_length = _item_length bandwidth_column_length = _item_length
for item in iops_data:
_item_length = len(str(item))
if _item_length > iops_column_length:
iops_column_length = _item_length
for item in lat_data: for item in lat_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > latency_column_length: if _item_length > latency_column_length:
latency_column_length = _item_length latency_column_length = _item_length
for item in lat_bucket_label: for item in lat_bucket_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > latency_bucket_label_length: if _item_length > latency_bucket_column_length:
latency_bucket_label_length = _item_length latency_bucket_column_length = _item_length
# Top row (Headers) # Top row (Headers)
ainformation.append( ainformation.append(
"{bold}\ "{bold}{overall_label: <{overall_label_length}} {header_fill}{end_bold}".format(
{overall_label: <{overall_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{iops: <{iops_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket} \
{end_bold}".format(
bold=ansiprint.bold(), bold=ansiprint.bold(),
end_bold=ansiprint.end(), end_bold=ansiprint.end(),
overall_label=nice_test_name_map[test], overall_label=nice_test_name_map[test],
overall_label_length=overall_label_length, overall_label_length=overall_label_length,
bandwidth_label="", header_fill="-"
bandwidth_label_length=bandwidth_label_length, * (
bandwidth="Bandwidth/s", (MAX_CONTENT_WIDTH if MAX_CONTENT_WIDTH <= 120 else 120)
bandwidth_length=bandwidth_column_length, - len(nice_test_name_map[test])
iops="IOPS", - 4
iops_length=iops_column_length, ),
latency="Latency (μs)", )
latency_length=latency_column_length, )
latency_bucket_label="Latency Buckets (μs/%)",
latency_bucket_label_length=latency_bucket_label_length, ainformation.append(
latency_bucket="", "{bold}\
{overall_label: <{overall_label_length}} \
{cpu_label: <{cpu_label_length}} \
{memory_label: <{memory_label_length}} \
{network_label: <{network_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{latency_label: <{latency_label_length}} \
{latency_bucket_label: <{latency_bucket_label_length}}\
{end_bold}".format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
overall_label="Overall",
overall_label_length=overall_label_length + overall_column_length + 1,
cpu_label="CPU (%)",
cpu_label_length=cpu_label_length + cpu_column_length + 1,
memory_label="Memory (%)",
memory_label_length=memory_label_length + memory_column_length + 1,
network_label="Network (bps)",
network_label_length=network_label_length + network_column_length + 1,
bandwidth_label="Bandwidth / IOPS",
bandwidth_label_length=bandwidth_label_length
+ bandwidth_column_length
+ 1,
latency_label="Latency (μs)",
latency_label_length=latency_label_length + latency_column_length + 1,
latency_bucket_label="Buckets (μs/%)",
latency_bucket_label_length=latency_bucket_label_length
+ latency_bucket_column_length,
) )
) )
@ -2588,14 +2692,20 @@ def format_info_benchmark_json(config, benchmark_information):
# Top row (Headers) # Top row (Headers)
ainformation.append( ainformation.append(
"{bold}\ "{bold}\
{overall_label: >{overall_label_length}} \ {overall_label: <{overall_label_length}} \
{overall: <{overall_length}} \ {overall: <{overall_length}} \
{bandwidth_label: >{bandwidth_label_length}} \ {cpu_label: <{cpu_label_length}} \
{cpu: <{cpu_length}} \
{memory_label: <{memory_label_length}} \
{memory: <{memory_length}} \
{network_label: <{network_label_length}} \
{network: <{network_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \ {bandwidth: <{bandwidth_length}} \
{iops: <{iops_length}} \ {latency_label: <{latency_label_length}} \
{latency: <{latency_length}} \ {latency: <{latency_length}} \
{latency_bucket_label: >{latency_bucket_label_length}} \ {latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket} \ {latency_bucket}\
{end_bold}".format( {end_bold}".format(
bold="", bold="",
end_bold="", end_bold="",
@ -2603,12 +2713,24 @@ def format_info_benchmark_json(config, benchmark_information):
overall_label_length=overall_label_length, overall_label_length=overall_label_length,
overall=overall_data[idx], overall=overall_data[idx],
overall_length=overall_column_length, overall_length=overall_column_length,
cpu_label=cpu_label[idx],
cpu_label_length=cpu_label_length,
cpu=cpu_data[idx],
cpu_length=cpu_column_length,
memory_label=memory_label[idx],
memory_label_length=memory_label_length,
memory=memory_data[idx],
memory_length=memory_column_length,
network_label=network_label[idx],
network_label_length=network_label_length,
network=network_data[idx],
network_length=network_column_length,
bandwidth_label=bandwidth_label[idx], bandwidth_label=bandwidth_label[idx],
bandwidth_label_length=bandwidth_label_length, bandwidth_label_length=bandwidth_label_length,
bandwidth=bandwidth_data[idx], bandwidth=bandwidth_data[idx],
bandwidth_length=bandwidth_column_length, bandwidth_length=bandwidth_column_length,
iops=iops_data[idx], latency_label=lat_label[idx],
iops_length=iops_column_length, latency_label_length=latency_label_length,
latency=lat_data[idx], latency=lat_data[idx],
latency_length=latency_column_length, latency_length=latency_column_length,
latency_bucket_label=lat_bucket_label[idx], latency_bucket_label=lat_bucket_label[idx],
@ -2617,4 +2739,4 @@ def format_info_benchmark_json(config, benchmark_information):
) )
) )
return "\n".join(ainformation) return "\n".join(ainformation) + "\n"

View File

@ -19,31 +19,34 @@
# #
############################################################################### ###############################################################################
import os
import psutil
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
import subprocess
from datetime import datetime from datetime import datetime
from json import loads, dumps from json import loads, dumps
from time import sleep
from daemon_lib.celery import start, fail, log_info, update, finish from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as pvc_ceph
# Define the current test format # Define the current test format
TEST_FORMAT = 1 TEST_FORMAT = 2
# We run a total of 8 tests, to give a generalized idea of performance on the cluster: # We run a total of 8 tests, to give a generalized idea of performance on the cluster:
# 1. A sequential read test of 8GB with a 4M block size # 1. A sequential read test of 64GB with a 4M block size
# 2. A sequential write test of 8GB with a 4M block size # 2. A sequential write test of 64GB with a 4M block size
# 3. A random read test of 8GB with a 4M block size # 3. A random read test of 64GB with a 4M block size
# 4. A random write test of 8GB with a 4M block size # 4. A random write test of 64GB with a 4M block size
# 5. A random read test of 8GB with a 256k block size # 5. A random read test of 64GB with a 256k block size
# 6. A random write test of 8GB with a 256k block size # 6. A random write test of 64GB with a 256k block size
# 7. A random read test of 8GB with a 4k block size # 7. A random read test of 64GB with a 4k block size
# 8. A random write test of 8GB with a 4k block size # 8. A random write test of 64GB with a 4k block size
# Taken together, these 8 results should give a very good indication of the overall storage performance # Taken together, these 8 results should give a very good indication of the overall storage performance
# for a variety of workloads. # for a variety of workloads.
test_matrix = { test_matrix = {
@ -100,7 +103,7 @@ test_matrix = {
# Specify the benchmark volume name and size # Specify the benchmark volume name and size
benchmark_volume_name = "pvcbenchmark" benchmark_volume_name = "pvcbenchmark"
benchmark_volume_size = "8G" benchmark_volume_size = "64G"
# #
@ -226,7 +229,7 @@ def cleanup_benchmark_volume(
def run_benchmark_job( def run_benchmark_job(
test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None config, test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
): ):
test_spec = test_matrix[test] test_spec = test_matrix[test]
log_info(None, f"Running test '{test}'") log_info(None, f"Running test '{test}'")
@ -256,31 +259,165 @@ def run_benchmark_job(
) )
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split()))) log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
# Run the fio command manually instead of using our run_os_command wrapper
# This will help us gather statistics about this node while it's running
process = subprocess.Popen(
fio_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait 15 seconds for the test to start
log_info(None, "Waiting 15 seconds for test resource stabilization")
sleep(15)
# Set up function to get process CPU utilization by name
def get_cpu_utilization_by_name(process_name):
cpu_usage = 0
for proc in psutil.process_iter(["name", "cpu_percent"]):
if proc.info["name"] == process_name:
cpu_usage += proc.info["cpu_percent"]
return cpu_usage
# Set up function to get process memory utilization by name
def get_memory_utilization_by_name(process_name):
memory_usage = 0
for proc in psutil.process_iter(["name", "memory_percent"]):
if proc.info["name"] == process_name:
memory_usage += proc.info["memory_percent"]
return memory_usage
# Set up function to get network traffic utilization in bps
def get_network_traffic_bps(interface, duration=1):
# Get initial network counters
net_io_start = psutil.net_io_counters(pernic=True)
if interface not in net_io_start:
return None, None
stats_start = net_io_start[interface]
bytes_sent_start = stats_start.bytes_sent
bytes_recv_start = stats_start.bytes_recv
# Wait for the specified duration
sleep(duration)
# Get final network counters
net_io_end = psutil.net_io_counters(pernic=True)
stats_end = net_io_end[interface]
bytes_sent_end = stats_end.bytes_sent
bytes_recv_end = stats_end.bytes_recv
# Calculate bytes per second
bytes_sent_per_sec = (bytes_sent_end - bytes_sent_start) / duration
bytes_recv_per_sec = (bytes_recv_end - bytes_recv_start) / duration
# Convert to bits per second (bps)
bits_sent_per_sec = bytes_sent_per_sec * 8
bits_recv_per_sec = bytes_recv_per_sec * 8
bits_total_per_sec = bits_sent_per_sec + bits_recv_per_sec
return bits_sent_per_sec, bits_recv_per_sec, bits_total_per_sec
log_info(None, f"Starting system resource polling for test '{test}'")
storage_interface = config["storage_dev"]
total_cpus = psutil.cpu_count(logical=True)
ticks = 1
osd_cpu_utilization = 0
osd_memory_utilization = 0
mon_cpu_utilization = 0
mon_memory_utilization = 0
total_cpu_utilization = 0
total_memory_utilization = 0
storage_sent_bps = 0
storage_recv_bps = 0
storage_total_bps = 0
while process.poll() is None:
# Do collection of statistics like network bandwidth and cpu utilization
current_osd_cpu_utilization = get_cpu_utilization_by_name("ceph-osd")
current_osd_memory_utilization = get_memory_utilization_by_name("ceph-osd")
current_mon_cpu_utilization = get_cpu_utilization_by_name("ceph-mon")
current_mon_memory_utilization = get_memory_utilization_by_name("ceph-mon")
current_total_cpu_utilization = psutil.cpu_percent(interval=1)
current_total_memory_utilization = psutil.virtual_memory().percent
(
current_storage_sent_bps,
current_storage_recv_bps,
current_storage_total_bps,
) = get_network_traffic_bps(storage_interface)
# Recheck if the process is done yet; if it's not, we add the values and increase the ticks
# This helps ensure that if the process finishes earlier than the longer polls above,
# this particular tick isn't counted which can skew the average
if process.poll() is None:
osd_cpu_utilization += current_osd_cpu_utilization
osd_memory_utilization += current_osd_memory_utilization
mon_cpu_utilization += current_mon_cpu_utilization
mon_memory_utilization += current_mon_memory_utilization
total_cpu_utilization += current_total_cpu_utilization
total_memory_utilization += current_total_memory_utilization
storage_sent_bps += current_storage_sent_bps
storage_recv_bps += current_storage_recv_bps
storage_total_bps += current_storage_total_bps
ticks += 1
# Get the 1-minute load average and CPU utilization, which covers the test duration
load1, _, _ = os.getloadavg()
load1 = round(load1, 2)
# Calculate the average CPU utilization values over the runtime
# Divide the OSD and MON CPU utilization by the total number of CPU cores, because
# the total is divided this way
avg_osd_cpu_utilization = round(osd_cpu_utilization / ticks / total_cpus, 2)
avg_osd_memory_utilization = round(osd_memory_utilization / ticks, 2)
avg_mon_cpu_utilization = round(mon_cpu_utilization / ticks / total_cpus, 2)
avg_mon_memory_utilization = round(mon_memory_utilization / ticks, 2)
avg_total_cpu_utilization = round(total_cpu_utilization / ticks, 2)
avg_total_memory_utilization = round(total_memory_utilization / ticks, 2)
avg_storage_sent_bps = round(storage_sent_bps / ticks, 2)
avg_storage_recv_bps = round(storage_recv_bps / ticks, 2)
avg_storage_total_bps = round(storage_total_bps / ticks, 2)
stdout, stderr = process.communicate()
retcode = process.returncode
resource_data = {
"avg_cpu_util_percent": {
"total": avg_total_cpu_utilization,
"ceph-mon": avg_mon_cpu_utilization,
"ceph-osd": avg_osd_cpu_utilization,
},
"avg_memory_util_percent": {
"total": avg_total_memory_utilization,
"ceph-mon": avg_mon_memory_utilization,
"ceph-osd": avg_osd_memory_utilization,
},
"avg_network_util_bps": {
"sent": avg_storage_sent_bps,
"recv": avg_storage_recv_bps,
"total": avg_storage_total_bps,
},
}
try: try:
jstdout = loads(stdout) jstdout = loads(stdout)
if retcode: if retcode:
raise raise
except Exception: except Exception:
cleanup( return None, None
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}': {stderr}",
)
return jstdout return resource_data, jstdout
def worker_run_benchmark(zkhandler, celery, config, pool): def worker_run_benchmark(zkhandler, celery, config, pool, name):
# Phase 0 - connect to databases # Phase 0 - connect to databases
if not name:
cur_time = datetime.now().isoformat(timespec="seconds") cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node") cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}" job_name = f"{cur_time}_{cur_primary}"
else:
job_name = name
current_stage = 0 current_stage = 0
total_stages = 13 total_stages = 13
@ -358,7 +495,8 @@ def worker_run_benchmark(zkhandler, celery, config, pool):
total=total_stages, total=total_stages,
) )
results[test] = run_benchmark_job( resource_data, fio_data = run_benchmark_job(
config,
test, test,
pool, pool,
job_name=job_name, job_name=job_name,
@ -366,6 +504,25 @@ def worker_run_benchmark(zkhandler, celery, config, pool):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
if resource_data is None or fio_data is None:
cleanup_benchmark_volume(
pool,
job_name=job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}'",
)
results[test] = {**resource_data, **fio_data}
# Phase 3 - cleanup # Phase 3 - cleanup
current_stage += 1 current_stage += 1

2
debian/control vendored
View File

@ -32,7 +32,7 @@ Description: Parallel Virtual Cluster worker daemon
Package: pvc-daemon-api Package: pvc-daemon-api
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate Depends: systemd, pvc-daemon-common, gunicorn, python3-gunicorn, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .

View File

@ -96,12 +96,12 @@ def create_vm(
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on") @celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, run_on="primary"): def storage_benchmark(self, pool=None, name=None, run_on="primary"):
@ZKConnection(config) @ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool): def run_storage_benchmark(zkhandler, self, pool, name):
return worker_run_benchmark(zkhandler, self, config, pool) return worker_run_benchmark(zkhandler, self, config, pool, name)
return run_storage_benchmark(self, pool) return run_storage_benchmark(self, pool, name)
@celery.task(name="cluster.autobackup", bind=True, routing_key="run_on") @celery.task(name="cluster.autobackup", bind=True, routing_key="run_on")