diff --git a/api-daemon/pvcapid/benchmark.py b/api-daemon/pvcapid/benchmark.py index a419f972..d24b6fd4 100755 --- a/api-daemon/pvcapid/benchmark.py +++ b/api-daemon/pvcapid/benchmark.py @@ -22,16 +22,22 @@ import psycopg2 import psycopg2.extras +from datetime import datetime from json import loads, dumps from pvcapid.Daemon import config from daemon_lib.zkhandler import ZKHandler +from daemon_lib.celery import start, fail, log_info, update, finish import daemon_lib.common as pvc_common import daemon_lib.ceph as pvc_ceph +# Define the current test format +TEST_FORMAT = 1 + + # We run a total of 8 tests, to give a generalized idea of performance on the cluster: # 1. A sequential read test of 8GB with a 4M block size # 2. A sequential write test of 8GB with a 4M block size @@ -104,27 +110,7 @@ benchmark_volume_size = "8G" # Exceptions (used by Celery tasks) # class BenchmarkError(Exception): - """ - An exception that results from the Benchmark job. - """ - - def __init__( - self, message, job_name=None, db_conn=None, db_cur=None, zkhandler=None - ): - self.message = message - if job_name is not None and db_conn is not None and db_cur is not None: - # Clean up our dangling result - query = "DELETE FROM storage_benchmarks WHERE job = %s;" - args = (job_name,) - db_cur.execute(query, args) - db_conn.commit() - # Close the database connections cleanly - close_database(db_conn, db_cur) - if job_name is not None and zkhandler is not None: - zkhandler.disconnect() - - def __str__(self): - return str(self.message) + pass # @@ -132,6 +118,20 @@ class BenchmarkError(Exception): # +def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None): + if db_conn is not None and db_cur is not None: + # Clean up our dangling result + query = "DELETE FROM storage_benchmarks WHERE job = %s;" + args = (job_name,) + db_cur.execute(query, args) + db_conn.commit() + # Close the database connections cleanly + close_database(db_conn, db_cur) + if zkhandler is not None: + zkhandler.disconnect() + del zkhandler + + # Database connections def open_database(config): conn = psycopg2.connect( @@ -193,17 +193,18 @@ def prepare_benchmark_volume( zkhandler, pool, benchmark_volume_name, benchmark_volume_size ) if not retcode: - raise BenchmarkError( - 'Failed to create volume "{}" on pool "{}": {}'.format( - benchmark_volume_name, pool, retmsg - ), - job_name=job_name, + cleanup( + job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler, ) + fail( + None, + f'Failed to create volume "{benchmark_volume_name}" on pool "{pool}": {retmsg}', + ) else: - print(retmsg) + log_info(None, retmsg) def cleanup_benchmark_volume( @@ -212,24 +213,25 @@ def cleanup_benchmark_volume( # Remove the RBD volume retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, benchmark_volume_name) if not retcode: - raise BenchmarkError( - 'Failed to remove volume "{}" on pool "{}": {}'.format( - benchmark_volume_name, pool, retmsg - ), - job_name=job_name, + cleanup( + job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler, ) + fail( + None, + f'Failed to remove volume "{benchmark_volume_name}" from pool "{pool}": {retmsg}', + ) else: - print(retmsg) + log_info(None, retmsg) def run_benchmark_job( test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None ): test_spec = test_matrix[test] - print("Running test '{}'".format(test)) + log_info(None, f"Running test '{test}'") fio_cmd = """ fio \ --name={test} \ @@ -255,51 +257,73 @@ def run_benchmark_job( rw=test_spec["rw"], ) - print("Running fio job: {}".format(" ".join(fio_cmd.split()))) + log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split()))) retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd) - if retcode: - raise BenchmarkError( - "Failed to run fio test: {}".format(stderr), - job_name=job_name, + try: + jstdout = loads(stdout) + if retcode: + raise + except Exception: + cleanup( + job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler, ) + fail( + None, + f"Failed to run fio test '{test}': {stderr}", + ) - return loads(stdout) + return jstdout def run_benchmark(self, pool): - # Runtime imports - import time - from datetime import datetime - - # Define the current test format - TEST_FORMAT = 1 - - time.sleep(2) - # Phase 0 - connect to databases - try: - db_conn, db_cur = open_database(config) - except Exception: - print("FATAL - failed to connect to Postgres") - raise Exception - try: zkhandler = ZKHandler(config) zkhandler.connect() except Exception: - print("FATAL - failed to connect to Zookeeper") - raise Exception + fail( + self, + "Failed to connect to Zookeeper", + ) cur_time = datetime.now().isoformat(timespec="seconds") cur_primary = zkhandler.read("base.config.primary_node") - job_name = "{}_{}".format(cur_time, cur_primary) + job_name = f"{cur_time}_{cur_primary}" - print("Starting storage benchmark '{}' on pool '{}'".format(job_name, pool)) + current_stage = 0 + total_stages = 13 + start( + self, + f"Running storage benchmark '{job_name}' on pool '{pool}'", + current=current_stage, + total=total_stages, + ) + + try: + db_conn, db_cur = open_database(config) + except Exception: + cleanup( + job_name, + db_conn=None, + db_cur=None, + zkhandler=zkhandler, + ) + fail( + self, + "Failed to connect to Postgres", + ) + + current_stage += 1 + update( + self, + "Storing running status in database", + current=current_stage, + total=total_stages, + ) - print("Storing running status for job '{}' in database".format(job_name)) try: query = "INSERT INTO storage_benchmarks (job, test_format, result) VALUES (%s, %s, %s);" args = ( @@ -310,20 +334,21 @@ def run_benchmark(self, pool): db_cur.execute(query, args) db_conn.commit() except Exception as e: - raise BenchmarkError( - "Failed to store running status: {}".format(e), - job_name=job_name, + cleanup( + job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler, ) + fail(self, f"Failed to store running status: {e}", exception=BenchmarkError) - # Phase 1 - volume preparation - self.update_state( - state="RUNNING", - meta={"current": 1, "total": 3, "status": "Creating benchmark volume"}, + current_stage += 1 + update( + self, + "Creating benchmark volume", + current=current_stage, + total=total_stages, ) - time.sleep(1) prepare_benchmark_volume( pool, @@ -334,14 +359,16 @@ def run_benchmark(self, pool): ) # Phase 2 - benchmark run - self.update_state( - state="RUNNING", - meta={"current": 2, "total": 3, "status": "Running fio benchmarks on volume"}, - ) - time.sleep(1) - results = dict() for test in test_matrix: + current_stage += 1 + update( + self, + f"Running benchmark job '{test}'", + current=current_stage, + total=total_stages, + ) + results[test] = run_benchmark_job( test, pool, @@ -352,11 +379,13 @@ def run_benchmark(self, pool): ) # Phase 3 - cleanup - self.update_state( - state="RUNNING", - meta={"current": 3, "total": 3, "status": "Cleaning up and storing results"}, + current_stage += 1 + update( + self, + "Cleaning up venchmark volume", + current=current_stage, + total=total_stages, ) - time.sleep(1) cleanup_benchmark_volume( pool, @@ -366,27 +395,39 @@ def run_benchmark(self, pool): zkhandler=zkhandler, ) - print("Storing result of tests for job '{}' in database".format(job_name)) + current_stage += 1 + update( + self, + "Storing results in database", + current=current_stage, + total=total_stages, + ) + try: query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;" args = (dumps(results), job_name) db_cur.execute(query, args) db_conn.commit() except Exception as e: - raise BenchmarkError( - "Failed to store test results: {}".format(e), - job_name=job_name, + cleanup( + job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler, ) + fail(self, f"Failed to store test results: {e}", exception=BenchmarkError) - close_database(db_conn, db_cur) - zkhandler.disconnect() - del zkhandler + cleanup( + job_name, + db_conn=db_conn, + db_cur=db_cur, + zkhandler=zkhandler, + ) - return { - "status": "Storage benchmark '{}' completed successfully.", - "current": 3, - "total": 3, - } + current_stage += 1 + return finish( + self, + f"Storage benchmark {job_name} completed successfully.", + current=current_stage, + total=total_stages, + )