Port benchmark to ZKConnection

This commit is contained in:
Joshua Boniface 2021-05-29 00:24:53 -04:00
parent 1963f2c336
commit 2c0bafc313
1 changed files with 13 additions and 17 deletions

View File

@ -24,6 +24,8 @@ import psycopg2.extras
from pvcapid.Daemon import config from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKConnection
import daemon_lib.common as pvc_common import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph import daemon_lib.ceph as pvc_ceph
@ -35,7 +37,7 @@ class BenchmarkError(Exception):
""" """
An exception that results from the Benchmark job. An exception that results from the Benchmark job.
""" """
def __init__(self, message, cur_time=None, db_conn=None, db_cur=None, zk_conn=None): def __init__(self, message, cur_time=None, db_conn=None, db_cur=None, zkhandler=None):
self.message = message self.message = message
if cur_time is not None: if cur_time is not None:
# Clean up our dangling result # Clean up our dangling result
@ -45,7 +47,7 @@ class BenchmarkError(Exception):
db_conn.commit() db_conn.commit()
# Close the database connections cleanly # Close the database connections cleanly
close_database(db_conn, db_cur) close_database(db_conn, db_cur)
pvc_common.stopZKConnection(zk_conn) zkhandler.disconnect()
def __str__(self): def __str__(self):
return str(self.message) return str(self.message)
@ -101,7 +103,8 @@ def list_benchmarks(job=None):
return {'message': 'No benchmark found.'}, 404 return {'message': 'No benchmark found.'}, 404
def run_benchmark(self, pool): @ZKConnection(config)
def run_benchmark(self, zkhandler, pool):
# Runtime imports # Runtime imports
import time import time
import json import json
@ -120,12 +123,6 @@ def run_benchmark(self, pool):
print('FATAL - failed to connect to Postgres') print('FATAL - failed to connect to Postgres')
raise Exception raise Exception
try:
zk_conn = pvc_common.startZKConnection(config['coordinators'])
except Exception:
print('FATAL - failed to connect to Zookeeper')
raise Exception
print("Storing running status for job '{}' in database".format(cur_time)) print("Storing running status for job '{}' in database".format(cur_time))
try: try:
query = "INSERT INTO storage_benchmarks (job, result) VALUES (%s, %s);" query = "INSERT INTO storage_benchmarks (job, result) VALUES (%s, %s);"
@ -133,7 +130,7 @@ def run_benchmark(self, pool):
db_cur.execute(query, args) db_cur.execute(query, args)
db_conn.commit() db_conn.commit()
except Exception as e: except Exception as e:
raise BenchmarkError("Failed to store running status: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zk_conn=zk_conn) raise BenchmarkError("Failed to store running status: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
# Phase 1 - volume preparation # Phase 1 - volume preparation
self.update_state(state='RUNNING', meta={'current': 1, 'total': 3, 'status': 'Creating benchmark volume'}) self.update_state(state='RUNNING', meta={'current': 1, 'total': 3, 'status': 'Creating benchmark volume'})
@ -142,9 +139,9 @@ def run_benchmark(self, pool):
volume = 'pvcbenchmark' volume = 'pvcbenchmark'
# Create the RBD volume # Create the RBD volume
retcode, retmsg = pvc_ceph.add_volume(zk_conn, pool, volume, "8G") retcode, retmsg = pvc_ceph.add_volume(zkhandler, pool, volume, "8G")
if not retcode: if not retcode:
raise BenchmarkError('Failed to create volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zk_conn=zk_conn) raise BenchmarkError('Failed to create volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
else: else:
print(retmsg) print(retmsg)
@ -231,7 +228,7 @@ def run_benchmark(self, pool):
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd) retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
if retcode: if retcode:
raise BenchmarkError("Failed to run fio test: {}".format(stderr), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zk_conn=zk_conn) raise BenchmarkError("Failed to run fio test: {}".format(stderr), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
# Parse the terse results to avoid storing tons of junk # Parse the terse results to avoid storing tons of junk
# Reference: https://fio.readthedocs.io/en/latest/fio_doc.html#terse-output # Reference: https://fio.readthedocs.io/en/latest/fio_doc.html#terse-output
@ -432,9 +429,9 @@ def run_benchmark(self, pool):
time.sleep(1) time.sleep(1)
# Remove the RBD volume # Remove the RBD volume
retcode, retmsg = pvc_ceph.remove_volume(zk_conn, pool, volume) retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, volume)
if not retcode: if not retcode:
raise BenchmarkError('Failed to remove volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zk_conn=zk_conn) raise BenchmarkError('Failed to remove volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
else: else:
print(retmsg) print(retmsg)
@ -445,8 +442,7 @@ def run_benchmark(self, pool):
db_cur.execute(query, args) db_cur.execute(query, args)
db_conn.commit() db_conn.commit()
except Exception as e: except Exception as e:
raise BenchmarkError("Failed to store test results: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zk_conn=zk_conn) raise BenchmarkError("Failed to store test results: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
close_database(db_conn, db_cur) close_database(db_conn, db_cur)
pvc_common.stopZKConnection(zk_conn)
return {'status': "Storage benchmark '{}' completed successfully.", 'current': 3, 'total': 3} return {'status': "Storage benchmark '{}' completed successfully.", 'current': 3, 'total': 3}