Convert benchmark to use new Celery step structure

This commit is contained in:
Joshua Boniface 2023-11-16 19:21:36 -05:00
parent 4d23d0419c
commit e8da3714c0
1 changed files with 131 additions and 90 deletions

View File

@ -22,16 +22,22 @@
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
from datetime import datetime
from json import loads, dumps from json import loads, dumps
from pvcapid.Daemon import config from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as pvc_ceph
# Define the current test format
TEST_FORMAT = 1
# We run a total of 8 tests, to give a generalized idea of performance on the cluster: # We run a total of 8 tests, to give a generalized idea of performance on the cluster:
# 1. A sequential read test of 8GB with a 4M block size # 1. A sequential read test of 8GB with a 4M block size
# 2. A sequential write test of 8GB with a 4M block size # 2. A sequential write test of 8GB with a 4M block size
@ -104,27 +110,7 @@ benchmark_volume_size = "8G"
# Exceptions (used by Celery tasks) # Exceptions (used by Celery tasks)
# #
class BenchmarkError(Exception): class BenchmarkError(Exception):
""" pass
An exception that results from the Benchmark job.
"""
def __init__(
self, message, job_name=None, db_conn=None, db_cur=None, zkhandler=None
):
self.message = message
if job_name is not None and db_conn is not None and db_cur is not None:
# Clean up our dangling result
query = "DELETE FROM storage_benchmarks WHERE job = %s;"
args = (job_name,)
db_cur.execute(query, args)
db_conn.commit()
# Close the database connections cleanly
close_database(db_conn, db_cur)
if job_name is not None and zkhandler is not None:
zkhandler.disconnect()
def __str__(self):
return str(self.message)
# #
@ -132,6 +118,20 @@ class BenchmarkError(Exception):
# #
def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
if db_conn is not None and db_cur is not None:
# Clean up our dangling result
query = "DELETE FROM storage_benchmarks WHERE job = %s;"
args = (job_name,)
db_cur.execute(query, args)
db_conn.commit()
# Close the database connections cleanly
close_database(db_conn, db_cur)
if zkhandler is not None:
zkhandler.disconnect()
del zkhandler
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
@ -193,17 +193,18 @@ def prepare_benchmark_volume(
zkhandler, pool, benchmark_volume_name, benchmark_volume_size zkhandler, pool, benchmark_volume_name, benchmark_volume_size
) )
if not retcode: if not retcode:
raise BenchmarkError( cleanup(
'Failed to create volume "{}" on pool "{}": {}'.format( job_name,
benchmark_volume_name, pool, retmsg
),
job_name=job_name,
db_conn=db_conn, db_conn=db_conn,
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(
None,
f'Failed to create volume "{benchmark_volume_name}" on pool "{pool}": {retmsg}',
)
else: else:
print(retmsg) log_info(None, retmsg)
def cleanup_benchmark_volume( def cleanup_benchmark_volume(
@ -212,24 +213,25 @@ def cleanup_benchmark_volume(
# Remove the RBD volume # Remove the RBD volume
retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, benchmark_volume_name) retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, benchmark_volume_name)
if not retcode: if not retcode:
raise BenchmarkError( cleanup(
'Failed to remove volume "{}" on pool "{}": {}'.format( job_name,
benchmark_volume_name, pool, retmsg
),
job_name=job_name,
db_conn=db_conn, db_conn=db_conn,
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(
None,
f'Failed to remove volume "{benchmark_volume_name}" from pool "{pool}": {retmsg}',
)
else: else:
print(retmsg) log_info(None, retmsg)
def run_benchmark_job( def run_benchmark_job(
test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
): ):
test_spec = test_matrix[test] test_spec = test_matrix[test]
print("Running test '{}'".format(test)) log_info(None, f"Running test '{test}'")
fio_cmd = """ fio_cmd = """
fio \ fio \
--name={test} \ --name={test} \
@ -255,51 +257,73 @@ def run_benchmark_job(
rw=test_spec["rw"], rw=test_spec["rw"],
) )
print("Running fio job: {}".format(" ".join(fio_cmd.split()))) log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd) retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
if retcode: try:
raise BenchmarkError( jstdout = loads(stdout)
"Failed to run fio test: {}".format(stderr), if retcode:
job_name=job_name, raise
except Exception:
cleanup(
job_name,
db_conn=db_conn, db_conn=db_conn,
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(
None,
f"Failed to run fio test '{test}': {stderr}",
)
return loads(stdout) return jstdout
def run_benchmark(self, pool): def run_benchmark(self, pool):
# Runtime imports
import time
from datetime import datetime
# Define the current test format
TEST_FORMAT = 1
time.sleep(2)
# Phase 0 - connect to databases # Phase 0 - connect to databases
try:
db_conn, db_cur = open_database(config)
except Exception:
print("FATAL - failed to connect to Postgres")
raise Exception
try: try:
zkhandler = ZKHandler(config) zkhandler = ZKHandler(config)
zkhandler.connect() zkhandler.connect()
except Exception: except Exception:
print("FATAL - failed to connect to Zookeeper") fail(
raise Exception self,
"Failed to connect to Zookeeper",
)
cur_time = datetime.now().isoformat(timespec="seconds") cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node") cur_primary = zkhandler.read("base.config.primary_node")
job_name = "{}_{}".format(cur_time, cur_primary) job_name = f"{cur_time}_{cur_primary}"
print("Starting storage benchmark '{}' on pool '{}'".format(job_name, pool)) current_stage = 0
total_stages = 13
start(
self,
f"Running storage benchmark '{job_name}' on pool '{pool}'",
current=current_stage,
total=total_stages,
)
try:
db_conn, db_cur = open_database(config)
except Exception:
cleanup(
job_name,
db_conn=None,
db_cur=None,
zkhandler=zkhandler,
)
fail(
self,
"Failed to connect to Postgres",
)
current_stage += 1
update(
self,
"Storing running status in database",
current=current_stage,
total=total_stages,
)
print("Storing running status for job '{}' in database".format(job_name))
try: try:
query = "INSERT INTO storage_benchmarks (job, test_format, result) VALUES (%s, %s, %s);" query = "INSERT INTO storage_benchmarks (job, test_format, result) VALUES (%s, %s, %s);"
args = ( args = (
@ -310,20 +334,21 @@ def run_benchmark(self, pool):
db_cur.execute(query, args) db_cur.execute(query, args)
db_conn.commit() db_conn.commit()
except Exception as e: except Exception as e:
raise BenchmarkError( cleanup(
"Failed to store running status: {}".format(e), job_name,
job_name=job_name,
db_conn=db_conn, db_conn=db_conn,
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError)
# Phase 1 - volume preparation current_stage += 1
self.update_state( update(
state="RUNNING", self,
meta={"current": 1, "total": 3, "status": "Creating benchmark volume"}, "Creating benchmark volume",
current=current_stage,
total=total_stages,
) )
time.sleep(1)
prepare_benchmark_volume( prepare_benchmark_volume(
pool, pool,
@ -334,14 +359,16 @@ def run_benchmark(self, pool):
) )
# Phase 2 - benchmark run # Phase 2 - benchmark run
self.update_state(
state="RUNNING",
meta={"current": 2, "total": 3, "status": "Running fio benchmarks on volume"},
)
time.sleep(1)
results = dict() results = dict()
for test in test_matrix: for test in test_matrix:
current_stage += 1
update(
self,
f"Running benchmark job '{test}'",
current=current_stage,
total=total_stages,
)
results[test] = run_benchmark_job( results[test] = run_benchmark_job(
test, test,
pool, pool,
@ -352,11 +379,13 @@ def run_benchmark(self, pool):
) )
# Phase 3 - cleanup # Phase 3 - cleanup
self.update_state( current_stage += 1
state="RUNNING", update(
meta={"current": 3, "total": 3, "status": "Cleaning up and storing results"}, self,
"Cleaning up venchmark volume",
current=current_stage,
total=total_stages,
) )
time.sleep(1)
cleanup_benchmark_volume( cleanup_benchmark_volume(
pool, pool,
@ -366,27 +395,39 @@ def run_benchmark(self, pool):
zkhandler=zkhandler, zkhandler=zkhandler,
) )
print("Storing result of tests for job '{}' in database".format(job_name)) current_stage += 1
update(
self,
"Storing results in database",
current=current_stage,
total=total_stages,
)
try: try:
query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;" query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;"
args = (dumps(results), job_name) args = (dumps(results), job_name)
db_cur.execute(query, args) db_cur.execute(query, args)
db_conn.commit() db_conn.commit()
except Exception as e: except Exception as e:
raise BenchmarkError( cleanup(
"Failed to store test results: {}".format(e), job_name,
job_name=job_name,
db_conn=db_conn, db_conn=db_conn,
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError)
close_database(db_conn, db_cur) cleanup(
zkhandler.disconnect() job_name,
del zkhandler db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
return { current_stage += 1
"status": "Storage benchmark '{}' completed successfully.", return finish(
"current": 3, self,
"total": 3, f"Storage benchmark {job_name} completed successfully.",
} current=current_stage,
total=total_stages,
)