Compare commits

..

No commits in common. "41cd34ba4de367d591f09146cfd49cfc12203a4f" and "73c0834f85ad9ef2a17b99643b8c83c3915d1bd3" have entirely different histories.

8 changed files with 270 additions and 540 deletions

View File

@ -19,13 +19,6 @@
# #
############################################################################### ###############################################################################
import sys import pvcapid.Daemon # noqa: F401
from os import path
# Ensure current directory (/usr/share/pvc) is in the system path for Gunicorn
current_dir = path.dirname(path.abspath(__file__))
sys.path.append(current_dir)
import pvcapid.Daemon # noqa: F401, E402
pvcapid.Daemon.entrypoint() pvcapid.Daemon.entrypoint()

View File

@ -19,13 +19,15 @@
# #
############################################################################### ###############################################################################
import subprocess
from ssl import SSLContext, TLSVersion from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg import daemon_lib.config as cfg
# Daemon version # Daemon version
version = "0.9.100~git-73c0834f" version = "0.9.100"
# API version # API version
API_VERSION = 1.0 API_VERSION = 1.0
@ -51,6 +53,7 @@ def strtobool(stringv):
# Configuration Parsing # Configuration Parsing
########################################################## ##########################################################
# Get our configuration # Get our configuration
config = cfg.get_configuration() config = cfg.get_configuration()
config["daemon_name"] = "pvcapid" config["daemon_name"] = "pvcapid"
@ -58,16 +61,22 @@ config["daemon_version"] = version
########################################################## ##########################################################
# Flask App Creation for Gunicorn # Entrypoint
########################################################## ##########################################################
def create_app(): def entrypoint():
""" import pvcapid.flaskapi as pvc_api # noqa: E402
Create and return the Flask app and SSL context if necessary.
""" if config["api_ssl_enabled"]:
# Import the Flask app from pvcapid.flaskapi after adjusting the path context = SSLContext()
import pvcapid.flaskapi as pvc_api context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs()
context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
context = None
# Print our startup messages # Print our startup messages
print("") print("")
@ -93,69 +102,9 @@ def create_app():
print("") print("")
pvc_api.celery_startup() pvc_api.celery_startup()
pvc_api.app.run(
return pvc_api.app config["api_listen_address"],
config["api_listen_port"],
threaded=True,
########################################################## ssl_context=context,
# Entrypoint )
##########################################################
def entrypoint():
if config["debug"]:
app = create_app()
if config["api_ssl_enabled"]:
ssl_context = SSLContext()
ssl_context.minimum_version = TLSVersion.TLSv1
ssl_context.get_ca_certs()
ssl_context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
ssl_context = None
app.run(
config["api_listen_address"],
config["api_listen_port"],
threaded=True,
ssl_context=ssl_context,
)
else:
# Build the command to run Gunicorn
gunicorn_cmd = [
"gunicorn",
"--workers",
"1",
"--threads",
"8",
"--timeout",
"86400",
"--bind",
"{}:{}".format(config["api_listen_address"], config["api_listen_port"]),
"pvcapid.Daemon:create_app()",
"--log-level",
"info",
"--access-logfile",
"-",
"--error-logfile",
"-",
]
if config["api_ssl_enabled"]:
gunicorn_cmd += [
"--certfile",
config["api_ssl_cert_file"],
"--keyfile",
config["api_ssl_key_file"],
]
# Run Gunicorn
try:
subprocess.run(gunicorn_cmd)
except KeyboardInterrupt:
exit(0)
except Exception as e:
print(e)
exit(1)

View File

@ -5058,7 +5058,95 @@ class API_Storage_Ceph_Benchmark(Resource):
description: The PVC benchmark format of the results description: The PVC benchmark format of the results
benchmark_result: benchmark_result:
type: object type: object
description: A benchmark test result; format not documented due to complexity description: A format 0 test result
properties:
test_name:
type: object
properties:
overall:
type: object
properties:
iosize:
type: string (integer)
description: The total size of the benchmark data
bandwidth:
type: string (integer)
description: The average bandwidth (KiB/s)
iops:
type: string (integer)
description: The average IOPS
runtime:
type: string (integer)
description: The total test time in milliseconds
latency:
type: object
properties:
min:
type: string (integer)
description: The minimum latency measurement
max:
type: string (integer)
description: The maximum latency measurement
mean:
type: string (float)
description: The mean latency measurement
stdev:
type: string (float)
description: The standard deviation of latency
bandwidth:
type: object
properties:
min:
type: string (integer)
description: The minimum bandwidth (KiB/s) measurement
max:
type: string (integer)
description: The maximum bandwidth (KiB/s) measurement
mean:
type: string (float)
description: The mean bandwidth (KiB/s) measurement
stdev:
type: string (float)
description: The standard deviation of bandwidth
numsamples:
type: string (integer)
description: The number of samples taken during the test
iops:
type: object
properties:
min:
type: string (integer)
description: The minimum IOPS measurement
max:
type: string (integer)
description: The maximum IOPS measurement
mean:
type: string (float)
description: The mean IOPS measurement
stdev:
type: string (float)
description: The standard deviation of IOPS
numsamples:
type: string (integer)
description: The number of samples taken during the test
cpu:
type: object
properties:
user:
type: string (float percentage)
description: The percentage of test time spent in user space
system:
type: string (float percentage)
description: The percentage of test time spent in system (kernel) space
ctxsw:
type: string (integer)
description: The number of context switches during the test
majfault:
type: string (integer)
description: The number of major page faults during the test
minfault:
type: string (integer)
description: The number of minor page faults during the test
""" """
return list_benchmarks(config, reqargs.get("job", None)) return list_benchmarks(config, reqargs.get("job", None))
@ -5069,10 +5157,6 @@ class API_Storage_Ceph_Benchmark(Resource):
"required": True, "required": True,
"helptext": "A valid pool must be specified.", "helptext": "A valid pool must be specified.",
}, },
{
"name": "name",
"required": False,
},
] ]
) )
@Authenticator @Authenticator
@ -5088,11 +5172,6 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string type: string
required: true required: true
description: The PVC storage pool to benchmark description: The PVC storage pool to benchmark
- in: query
name: name
type: string
required: false
description: An optional override name for the job
responses: responses:
200: 200:
description: OK description: OK
@ -5110,10 +5189,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400 }, 400
task = run_celery_task( task = run_celery_task(
"storage.benchmark", "storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
pool=reqargs.get("pool", None),
name=reqargs.get("name", None),
run_on="primary",
) )
return ( return (
{ {

View File

@ -3755,13 +3755,6 @@ def cli_storage_benchmark():
@click.command(name="run", short_help="Run a storage benchmark.") @click.command(name="run", short_help="Run a storage benchmark.")
@connection_req @connection_req
@click.argument("pool") @click.argument("pool")
@click.option(
"--name",
"name",
default=None,
show_default=False,
help="Use a custom name for the job",
)
@click.option( @click.option(
"--wait/--no-wait", "--wait/--no-wait",
"wait_flag", "wait_flag",
@ -3773,14 +3766,12 @@ def cli_storage_benchmark():
@confirm_opt( @confirm_opt(
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue" "Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
) )
def cli_storage_benchmark_run(pool, name, wait_flag): def cli_storage_benchmark_run(pool, wait_flag):
""" """
Run a storage benchmark on POOL in the background. Run a storage benchmark on POOL in the background.
""" """
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run( retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag)
CLI_CONFIG, pool, name, wait_flag
)
if retcode and wait_flag: if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)

View File

@ -30,7 +30,6 @@ from requests_toolbelt.multipart.encoder import (
import pvc.lib.ansiprint as ansiprint import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
from pvc.cli.helpers import MAX_CONTENT_WIDTH
# #
# Supplemental functions # Supplemental functions
@ -1725,17 +1724,15 @@ def format_list_snapshot(config, snapshot_list):
# #
# Benchmark functions # Benchmark functions
# #
def ceph_benchmark_run(config, pool, name, wait_flag): def ceph_benchmark_run(config, pool, wait_flag):
""" """
Run a storage benchmark against {pool} Run a storage benchmark against {pool}
API endpoint: POST /api/v1/storage/ceph/benchmark API endpoint: POST /api/v1/storage/ceph/benchmark
API arguments: pool={pool}, name={name} API arguments: pool={pool}
API schema: {message} API schema: {message}
""" """
params = {"pool": pool} params = {"pool": pool}
if name:
params["name"] = name
response = call_api(config, "post", "/storage/ceph/benchmark", params=params) response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
return get_wait_retdata(response, wait_flag) return get_wait_retdata(response, wait_flag)
@ -1807,7 +1804,7 @@ def get_benchmark_list_results(benchmark_format, benchmark_data):
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy( benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_legacy(
benchmark_data benchmark_data
) )
elif benchmark_format == 1 or benchmark_format == 2: elif benchmark_format == 1:
benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json( benchmark_bandwidth, benchmark_iops = get_benchmark_list_results_json(
benchmark_data benchmark_data
) )
@ -2009,7 +2006,6 @@ def format_info_benchmark(config, benchmark_information):
benchmark_matrix = { benchmark_matrix = {
0: format_info_benchmark_legacy, 0: format_info_benchmark_legacy,
1: format_info_benchmark_json, 1: format_info_benchmark_json,
2: format_info_benchmark_json,
} }
benchmark_version = benchmark_information[0]["test_format"] benchmark_version = benchmark_information[0]["test_format"]
@ -2344,15 +2340,12 @@ def format_info_benchmark_json(config, benchmark_information):
if benchmark_information["benchmark_result"] == "Running": if benchmark_information["benchmark_result"] == "Running":
return "Benchmark test is still running." return "Benchmark test is still running."
benchmark_format = benchmark_information["test_format"]
benchmark_details = benchmark_information["benchmark_result"] benchmark_details = benchmark_information["benchmark_result"]
# Format a nice output; do this line-by-line then concat the elements at the end # Format a nice output; do this line-by-line then concat the elements at the end
ainformation = [] ainformation = []
ainformation.append( ainformation.append(
"{}Storage Benchmark details (format {}):{}".format( "{}Storage Benchmark details:{}".format(ansiprint.bold(), ansiprint.end())
ansiprint.bold(), benchmark_format, ansiprint.end()
)
) )
nice_test_name_map = { nice_test_name_map = {
@ -2400,7 +2393,7 @@ def format_info_benchmark_json(config, benchmark_information):
if element[1] != 0: if element[1] != 0:
useful_latency_tree.append(element) useful_latency_tree.append(element)
max_rows = 5 max_rows = 9
if len(useful_latency_tree) > 9: if len(useful_latency_tree) > 9:
max_rows = len(useful_latency_tree) max_rows = len(useful_latency_tree)
elif len(useful_latency_tree) < 9: elif len(useful_latency_tree) < 9:
@ -2409,10 +2402,15 @@ def format_info_benchmark_json(config, benchmark_information):
# Format the static data # Format the static data
overall_label = [ overall_label = [
"BW/s:", "Overall BW/s:",
"IOPS:", "Overall IOPS:",
"I/O:", "Total I/O:",
"Time:", "Runtime (s):",
"User CPU %:",
"System CPU %:",
"Ctx Switches:",
"Major Faults:",
"Minor Faults:",
] ]
while len(overall_label) < max_rows: while len(overall_label) < max_rows:
overall_label.append("") overall_label.append("")
@ -2421,149 +2419,68 @@ def format_info_benchmark_json(config, benchmark_information):
format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])), format_bytes_tohuman(int(job_details[io_class]["bw_bytes"])),
format_ops_tohuman(int(job_details[io_class]["iops"])), format_ops_tohuman(int(job_details[io_class]["iops"])),
format_bytes_tohuman(int(job_details[io_class]["io_bytes"])), format_bytes_tohuman(int(job_details[io_class]["io_bytes"])),
str(job_details["job_runtime"] / 1000) + "s", job_details["job_runtime"] / 1000,
job_details["usr_cpu"],
job_details["sys_cpu"],
job_details["ctx"],
job_details["majf"],
job_details["minf"],
] ]
while len(overall_data) < max_rows: while len(overall_data) < max_rows:
overall_data.append("") overall_data.append("")
cpu_label = [
"Total:",
"User:",
"Sys:",
"OSD:",
"MON:",
]
while len(cpu_label) < max_rows:
cpu_label.append("")
cpu_data = [
(
benchmark_details[test]["avg_cpu_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
round(job_details["usr_cpu"], 2),
round(job_details["sys_cpu"], 2),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_cpu_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(cpu_data) < max_rows:
cpu_data.append("")
memory_label = [
"Total:",
"OSD:",
"MON:",
]
while len(memory_label) < max_rows:
memory_label.append("")
memory_data = [
(
benchmark_details[test]["avg_memory_util_percent"]["total"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-osd"]
if benchmark_format > 1
else "N/A"
),
(
benchmark_details[test]["avg_memory_util_percent"]["ceph-mon"]
if benchmark_format > 1
else "N/A"
),
]
while len(memory_data) < max_rows:
memory_data.append("")
network_label = [
"Total:",
"Sent:",
"Recv:",
]
while len(network_label) < max_rows:
network_label.append("")
network_data = [
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["total"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["sent"])
)
if benchmark_format > 1
else "N/A"
),
(
format_bytes_tohuman(
int(benchmark_details[test]["avg_network_util_bps"]["recv"])
)
if benchmark_format > 1
else "N/A"
),
]
while len(network_data) < max_rows:
network_data.append("")
bandwidth_label = [ bandwidth_label = [
"Min:", "Min:",
"Max:", "Max:",
"Mean:", "Mean:",
"StdDev:", "StdDev:",
"Samples:", "Samples:",
"",
"",
"",
"",
] ]
while len(bandwidth_label) < max_rows: while len(bandwidth_label) < max_rows:
bandwidth_label.append("") bandwidth_label.append("")
bandwidth_data = [ bandwidth_data = [
format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024) format_bytes_tohuman(int(job_details[io_class]["bw_min"]) * 1024),
+ " / " format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024),
+ format_ops_tohuman(int(job_details[io_class]["iops_min"])), format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024),
format_bytes_tohuman(int(job_details[io_class]["bw_max"]) * 1024) format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024),
+ " / " job_details[io_class]["bw_samples"],
+ format_ops_tohuman(int(job_details[io_class]["iops_max"])), "",
format_bytes_tohuman(int(job_details[io_class]["bw_mean"]) * 1024) "",
+ " / " "",
+ format_ops_tohuman(int(job_details[io_class]["iops_mean"])), "",
format_bytes_tohuman(int(job_details[io_class]["bw_dev"]) * 1024)
+ " / "
+ format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
str(job_details[io_class]["bw_samples"])
+ " / "
+ str(job_details[io_class]["iops_samples"]),
] ]
while len(bandwidth_data) < max_rows: while len(bandwidth_data) < max_rows:
bandwidth_data.append("") bandwidth_data.append("")
lat_label = [ iops_data = [
"Min:", format_ops_tohuman(int(job_details[io_class]["iops_min"])),
"Max:", format_ops_tohuman(int(job_details[io_class]["iops_max"])),
"Mean:", format_ops_tohuman(int(job_details[io_class]["iops_mean"])),
"StdDev:", format_ops_tohuman(int(job_details[io_class]["iops_stddev"])),
job_details[io_class]["iops_samples"],
"",
"",
"",
"",
] ]
while len(lat_label) < max_rows: while len(iops_data) < max_rows:
lat_label.append("") iops_data.append("")
lat_data = [ lat_data = [
int(job_details[io_class]["lat_ns"]["min"]) / 1000, int(job_details[io_class]["lat_ns"]["min"]) / 1000,
int(job_details[io_class]["lat_ns"]["max"]) / 1000, int(job_details[io_class]["lat_ns"]["max"]) / 1000,
int(job_details[io_class]["lat_ns"]["mean"]) / 1000, int(job_details[io_class]["lat_ns"]["mean"]) / 1000,
int(job_details[io_class]["lat_ns"]["stddev"]) / 1000, int(job_details[io_class]["lat_ns"]["stddev"]) / 1000,
"",
"",
"",
"",
"",
] ]
while len(lat_data) < max_rows: while len(lat_data) < max_rows:
lat_data.append("") lat_data.append("")
@ -2572,119 +2489,98 @@ def format_info_benchmark_json(config, benchmark_information):
lat_bucket_label = list() lat_bucket_label = list()
lat_bucket_data = list() lat_bucket_data = list()
for element in useful_latency_tree: for element in useful_latency_tree:
lat_bucket_label.append(element[0] + ":" if element[0] else "") lat_bucket_label.append(element[0])
lat_bucket_data.append(round(float(element[1]), 2) if element[1] else "") lat_bucket_data.append(element[1])
while len(lat_bucket_label) < max_rows:
lat_bucket_label.append("")
while len(lat_bucket_data) < max_rows:
lat_bucket_label.append("")
# Column default widths # Column default widths
overall_label_length = 5 overall_label_length = 0
overall_column_length = 0 overall_column_length = 0
cpu_label_length = 6 bandwidth_label_length = 0
cpu_column_length = 0 bandwidth_column_length = 11
memory_label_length = 6 iops_column_length = 4
memory_column_length = 0 latency_column_length = 12
network_label_length = 6
network_column_length = 6
bandwidth_label_length = 8
bandwidth_column_length = 0
latency_label_length = 7
latency_column_length = 0
latency_bucket_label_length = 0 latency_bucket_label_length = 0
latency_bucket_column_length = 0
# Column layout: # Column layout:
# Overall CPU Memory Network Bandwidth/IOPS Latency Percentiles # General Bandwidth IOPS Latency Percentiles
# --------- ----- ------- -------- -------------- -------- --------------- # --------- ---------- -------- -------- ---------------
# BW Total Total Total Min Min A # Size Min Min Min A
# IOPS Usr OSD Send Max Max B # BW Max Max Max B
# Time Sys MON Recv Mean Mean ... # IOPS Mean Mean Mean ...
# Size OSD StdDev StdDev Z # Runtime StdDev StdDev StdDev Z
# MON Samples # UsrCPU Samples Samples
# SysCPU
# CtxSw
# MajFault
# MinFault
# Set column widths # Set column widths
for item in overall_label:
_item_length = len(str(item))
if _item_length > overall_label_length:
overall_label_length = _item_length
for item in overall_data: for item in overall_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > overall_column_length: if _item_length > overall_column_length:
overall_column_length = _item_length overall_column_length = _item_length
for item in cpu_data: test_name_length = len(nice_test_name_map[test])
_item_length = len(str(item)) if test_name_length > overall_label_length + overall_column_length:
if _item_length > cpu_column_length: _diff = test_name_length - (overall_label_length + overall_column_length)
cpu_column_length = _item_length overall_column_length += _diff
for item in memory_data: for item in bandwidth_label:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > memory_column_length: if _item_length > bandwidth_label_length:
memory_column_length = _item_length bandwidth_label_length = _item_length
for item in network_data:
_item_length = len(str(item))
if _item_length > network_column_length:
network_column_length = _item_length
for item in bandwidth_data: for item in bandwidth_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > bandwidth_column_length: if _item_length > bandwidth_column_length:
bandwidth_column_length = _item_length bandwidth_column_length = _item_length
for item in iops_data:
_item_length = len(str(item))
if _item_length > iops_column_length:
iops_column_length = _item_length
for item in lat_data: for item in lat_data:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > latency_column_length: if _item_length > latency_column_length:
latency_column_length = _item_length latency_column_length = _item_length
for item in lat_bucket_data: for item in lat_bucket_label:
_item_length = len(str(item)) _item_length = len(str(item))
if _item_length > latency_bucket_column_length: if _item_length > latency_bucket_label_length:
latency_bucket_column_length = _item_length latency_bucket_label_length = _item_length
# Top row (Headers) # Top row (Headers)
ainformation.append( ainformation.append(
"{bold}{overall_label: <{overall_label_length}} {header_fill}{end_bold}".format( "{bold}\
{overall_label: <{overall_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{iops: <{iops_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket} \
{end_bold}".format(
bold=ansiprint.bold(), bold=ansiprint.bold(),
end_bold=ansiprint.end(), end_bold=ansiprint.end(),
overall_label=nice_test_name_map[test], overall_label=nice_test_name_map[test],
overall_label_length=overall_label_length, overall_label_length=overall_label_length,
header_fill="-" bandwidth_label="",
* ( bandwidth_label_length=bandwidth_label_length,
(MAX_CONTENT_WIDTH if MAX_CONTENT_WIDTH <= 120 else 120) bandwidth="Bandwidth/s",
- len(nice_test_name_map[test]) bandwidth_length=bandwidth_column_length,
- 4 iops="IOPS",
), iops_length=iops_column_length,
) latency="Latency (μs)",
) latency_length=latency_column_length,
latency_bucket_label="Latency Buckets (μs/%)",
ainformation.append( latency_bucket_label_length=latency_bucket_label_length,
"{bold}\ latency_bucket="",
{overall_label: <{overall_label_length}} \
{cpu_label: <{cpu_label_length}} \
{memory_label: <{memory_label_length}} \
{network_label: <{network_label_length}} \
{bandwidth_label: <{bandwidth_label_length}} \
{latency_label: <{latency_label_length}} \
{latency_bucket_label: <{latency_bucket_label_length}}\
{end_bold}".format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
overall_label="Overall",
overall_label_length=overall_label_length + overall_column_length + 1,
cpu_label="CPU (%)",
cpu_label_length=cpu_label_length + cpu_column_length + 1,
memory_label="Memory (%)",
memory_label_length=memory_label_length + memory_column_length + 1,
network_label="Network (bps)",
network_label_length=network_label_length + network_column_length + 1,
bandwidth_label="Bandwidth / IOPS",
bandwidth_label_length=bandwidth_label_length
+ bandwidth_column_length
+ 1,
latency_label="Latency (μs)",
latency_label_length=latency_label_length + latency_column_length + 1,
latency_bucket_label="Buckets (μs/%)",
latency_bucket_label_length=latency_bucket_label_length
+ latency_bucket_column_length,
) )
) )
@ -2692,20 +2588,14 @@ def format_info_benchmark_json(config, benchmark_information):
# Top row (Headers) # Top row (Headers)
ainformation.append( ainformation.append(
"{bold}\ "{bold}\
{overall_label: <{overall_label_length}} \ {overall_label: >{overall_label_length}} \
{overall: <{overall_length}} \ {overall: <{overall_length}} \
{cpu_label: <{cpu_label_length}} \ {bandwidth_label: >{bandwidth_label_length}} \
{cpu: <{cpu_length}} \ {bandwidth: <{bandwidth_length}} \
{memory_label: <{memory_label_length}} \ {iops: <{iops_length}} \
{memory: <{memory_length}} \ {latency: <{latency_length}} \
{network_label: <{network_label_length}} \ {latency_bucket_label: >{latency_bucket_label_length}} \
{network: <{network_length}} \ {latency_bucket} \
{bandwidth_label: <{bandwidth_label_length}} \
{bandwidth: <{bandwidth_length}} \
{latency_label: <{latency_label_length}} \
{latency: <{latency_length}} \
{latency_bucket_label: <{latency_bucket_label_length}} \
{latency_bucket}\
{end_bold}".format( {end_bold}".format(
bold="", bold="",
end_bold="", end_bold="",
@ -2713,24 +2603,12 @@ def format_info_benchmark_json(config, benchmark_information):
overall_label_length=overall_label_length, overall_label_length=overall_label_length,
overall=overall_data[idx], overall=overall_data[idx],
overall_length=overall_column_length, overall_length=overall_column_length,
cpu_label=cpu_label[idx],
cpu_label_length=cpu_label_length,
cpu=cpu_data[idx],
cpu_length=cpu_column_length,
memory_label=memory_label[idx],
memory_label_length=memory_label_length,
memory=memory_data[idx],
memory_length=memory_column_length,
network_label=network_label[idx],
network_label_length=network_label_length,
network=network_data[idx],
network_length=network_column_length,
bandwidth_label=bandwidth_label[idx], bandwidth_label=bandwidth_label[idx],
bandwidth_label_length=bandwidth_label_length, bandwidth_label_length=bandwidth_label_length,
bandwidth=bandwidth_data[idx], bandwidth=bandwidth_data[idx],
bandwidth_length=bandwidth_column_length, bandwidth_length=bandwidth_column_length,
latency_label=lat_label[idx], iops=iops_data[idx],
latency_label_length=latency_label_length, iops_length=iops_column_length,
latency=lat_data[idx], latency=lat_data[idx],
latency_length=latency_column_length, latency_length=latency_column_length,
latency_bucket_label=lat_bucket_label[idx], latency_bucket_label=lat_bucket_label[idx],
@ -2739,4 +2617,4 @@ def format_info_benchmark_json(config, benchmark_information):
) )
) )
return "\n".join(ainformation) + "\n" return "\n".join(ainformation)

View File

@ -19,34 +19,31 @@
# #
############################################################################### ###############################################################################
import os
import psutil
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
import subprocess
from datetime import datetime from datetime import datetime
from json import loads, dumps from json import loads, dumps
from time import sleep
from daemon_lib.celery import start, fail, log_info, update, finish from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as pvc_ceph
# Define the current test format # Define the current test format
TEST_FORMAT = 2 TEST_FORMAT = 1
# We run a total of 8 tests, to give a generalized idea of performance on the cluster: # We run a total of 8 tests, to give a generalized idea of performance on the cluster:
# 1. A sequential read test of 64GB with a 4M block size # 1. A sequential read test of 8GB with a 4M block size
# 2. A sequential write test of 64GB with a 4M block size # 2. A sequential write test of 8GB with a 4M block size
# 3. A random read test of 64GB with a 4M block size # 3. A random read test of 8GB with a 4M block size
# 4. A random write test of 64GB with a 4M block size # 4. A random write test of 8GB with a 4M block size
# 5. A random read test of 64GB with a 256k block size # 5. A random read test of 8GB with a 256k block size
# 6. A random write test of 64GB with a 256k block size # 6. A random write test of 8GB with a 256k block size
# 7. A random read test of 64GB with a 4k block size # 7. A random read test of 8GB with a 4k block size
# 8. A random write test of 64GB with a 4k block size # 8. A random write test of 8GB with a 4k block size
# Taken together, these 8 results should give a very good indication of the overall storage performance # Taken together, these 8 results should give a very good indication of the overall storage performance
# for a variety of workloads. # for a variety of workloads.
test_matrix = { test_matrix = {
@ -103,7 +100,7 @@ test_matrix = {
# Specify the benchmark volume name and size # Specify the benchmark volume name and size
benchmark_volume_name = "pvcbenchmark" benchmark_volume_name = "pvcbenchmark"
benchmark_volume_size = "64G" benchmark_volume_size = "8G"
# #
@ -229,7 +226,7 @@ def cleanup_benchmark_volume(
def run_benchmark_job( def run_benchmark_job(
config, test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
): ):
test_spec = test_matrix[test] test_spec = test_matrix[test]
log_info(None, f"Running test '{test}'") log_info(None, f"Running test '{test}'")
@ -259,165 +256,31 @@ def run_benchmark_job(
) )
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split()))) log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
# Run the fio command manually instead of using our run_os_command wrapper
# This will help us gather statistics about this node while it's running
process = subprocess.Popen(
fio_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait 15 seconds for the test to start
log_info(None, "Waiting 15 seconds for test resource stabilization")
sleep(15)
# Set up function to get process CPU utilization by name
def get_cpu_utilization_by_name(process_name):
cpu_usage = 0
for proc in psutil.process_iter(["name", "cpu_percent"]):
if proc.info["name"] == process_name:
cpu_usage += proc.info["cpu_percent"]
return cpu_usage
# Set up function to get process memory utilization by name
def get_memory_utilization_by_name(process_name):
memory_usage = 0
for proc in psutil.process_iter(["name", "memory_percent"]):
if proc.info["name"] == process_name:
memory_usage += proc.info["memory_percent"]
return memory_usage
# Set up function to get network traffic utilization in bps
def get_network_traffic_bps(interface, duration=1):
# Get initial network counters
net_io_start = psutil.net_io_counters(pernic=True)
if interface not in net_io_start:
return None, None
stats_start = net_io_start[interface]
bytes_sent_start = stats_start.bytes_sent
bytes_recv_start = stats_start.bytes_recv
# Wait for the specified duration
sleep(duration)
# Get final network counters
net_io_end = psutil.net_io_counters(pernic=True)
stats_end = net_io_end[interface]
bytes_sent_end = stats_end.bytes_sent
bytes_recv_end = stats_end.bytes_recv
# Calculate bytes per second
bytes_sent_per_sec = (bytes_sent_end - bytes_sent_start) / duration
bytes_recv_per_sec = (bytes_recv_end - bytes_recv_start) / duration
# Convert to bits per second (bps)
bits_sent_per_sec = bytes_sent_per_sec * 8
bits_recv_per_sec = bytes_recv_per_sec * 8
bits_total_per_sec = bits_sent_per_sec + bits_recv_per_sec
return bits_sent_per_sec, bits_recv_per_sec, bits_total_per_sec
log_info(None, f"Starting system resource polling for test '{test}'")
storage_interface = config["storage_dev"]
total_cpus = psutil.cpu_count(logical=True)
ticks = 1
osd_cpu_utilization = 0
osd_memory_utilization = 0
mon_cpu_utilization = 0
mon_memory_utilization = 0
total_cpu_utilization = 0
total_memory_utilization = 0
storage_sent_bps = 0
storage_recv_bps = 0
storage_total_bps = 0
while process.poll() is None:
# Do collection of statistics like network bandwidth and cpu utilization
current_osd_cpu_utilization = get_cpu_utilization_by_name("ceph-osd")
current_osd_memory_utilization = get_memory_utilization_by_name("ceph-osd")
current_mon_cpu_utilization = get_cpu_utilization_by_name("ceph-mon")
current_mon_memory_utilization = get_memory_utilization_by_name("ceph-mon")
current_total_cpu_utilization = psutil.cpu_percent(interval=1)
current_total_memory_utilization = psutil.virtual_memory().percent
(
current_storage_sent_bps,
current_storage_recv_bps,
current_storage_total_bps,
) = get_network_traffic_bps(storage_interface)
# Recheck if the process is done yet; if it's not, we add the values and increase the ticks
# This helps ensure that if the process finishes earlier than the longer polls above,
# this particular tick isn't counted which can skew the average
if process.poll() is None:
osd_cpu_utilization += current_osd_cpu_utilization
osd_memory_utilization += current_osd_memory_utilization
mon_cpu_utilization += current_mon_cpu_utilization
mon_memory_utilization += current_mon_memory_utilization
total_cpu_utilization += current_total_cpu_utilization
total_memory_utilization += current_total_memory_utilization
storage_sent_bps += current_storage_sent_bps
storage_recv_bps += current_storage_recv_bps
storage_total_bps += current_storage_total_bps
ticks += 1
# Get the 1-minute load average and CPU utilization, which covers the test duration
load1, _, _ = os.getloadavg()
load1 = round(load1, 2)
# Calculate the average CPU utilization values over the runtime
# Divide the OSD and MON CPU utilization by the total number of CPU cores, because
# the total is divided this way
avg_osd_cpu_utilization = round(osd_cpu_utilization / ticks / total_cpus, 2)
avg_osd_memory_utilization = round(osd_memory_utilization / ticks, 2)
avg_mon_cpu_utilization = round(mon_cpu_utilization / ticks / total_cpus, 2)
avg_mon_memory_utilization = round(mon_memory_utilization / ticks, 2)
avg_total_cpu_utilization = round(total_cpu_utilization / ticks, 2)
avg_total_memory_utilization = round(total_memory_utilization / ticks, 2)
avg_storage_sent_bps = round(storage_sent_bps / ticks, 2)
avg_storage_recv_bps = round(storage_recv_bps / ticks, 2)
avg_storage_total_bps = round(storage_total_bps / ticks, 2)
stdout, stderr = process.communicate()
retcode = process.returncode
resource_data = {
"avg_cpu_util_percent": {
"total": avg_total_cpu_utilization,
"ceph-mon": avg_mon_cpu_utilization,
"ceph-osd": avg_osd_cpu_utilization,
},
"avg_memory_util_percent": {
"total": avg_total_memory_utilization,
"ceph-mon": avg_mon_memory_utilization,
"ceph-osd": avg_osd_memory_utilization,
},
"avg_network_util_bps": {
"sent": avg_storage_sent_bps,
"recv": avg_storage_recv_bps,
"total": avg_storage_total_bps,
},
}
try: try:
jstdout = loads(stdout) jstdout = loads(stdout)
if retcode: if retcode:
raise raise
except Exception: except Exception:
return None, None cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}': {stderr}",
)
return resource_data, jstdout return jstdout
def worker_run_benchmark(zkhandler, celery, config, pool, name): def worker_run_benchmark(zkhandler, celery, config, pool):
# Phase 0 - connect to databases # Phase 0 - connect to databases
if not name: cur_time = datetime.now().isoformat(timespec="seconds")
cur_time = datetime.now().isoformat(timespec="seconds") cur_primary = zkhandler.read("base.config.primary_node")
cur_primary = zkhandler.read("base.config.primary_node") job_name = f"{cur_time}_{cur_primary}"
job_name = f"{cur_time}_{cur_primary}"
else:
job_name = name
current_stage = 0 current_stage = 0
total_stages = 13 total_stages = 13
@ -495,8 +358,7 @@ def worker_run_benchmark(zkhandler, celery, config, pool, name):
total=total_stages, total=total_stages,
) )
resource_data, fio_data = run_benchmark_job( results[test] = run_benchmark_job(
config,
test, test,
pool, pool,
job_name=job_name, job_name=job_name,
@ -504,25 +366,6 @@ def worker_run_benchmark(zkhandler, celery, config, pool, name):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
if resource_data is None or fio_data is None:
cleanup_benchmark_volume(
pool,
job_name=job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}'",
)
results[test] = {**resource_data, **fio_data}
# Phase 3 - cleanup # Phase 3 - cleanup
current_stage += 1 current_stage += 1

2
debian/control vendored
View File

@ -32,7 +32,7 @@ Description: Parallel Virtual Cluster worker daemon
Package: pvc-daemon-api Package: pvc-daemon-api
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, gunicorn, python3-gunicorn, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .

View File

@ -96,12 +96,12 @@ def create_vm(
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on") @celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, name=None, run_on="primary"): def storage_benchmark(self, pool=None, run_on="primary"):
@ZKConnection(config) @ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool, name): def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool, name) return worker_run_benchmark(zkhandler, self, config, pool)
return run_storage_benchmark(self, pool, name) return run_storage_benchmark(self, pool)
@celery.task(name="cluster.autobackup", bind=True, routing_key="run_on") @celery.task(name="cluster.autobackup", bind=True, routing_key="run_on")