Compare commits

...

10 Commits

Author SHA1 Message Date
460a2dd09f Bump version to 0.9.82 2023-11-25 15:38:50 -05:00
c30ea66d14 Add swagger document to gitignore 2023-11-25 15:37:01 -05:00
24cabd3b99 Fix missing result_backend on Debian 10/11
For whatever reason, a Celery worker on <5.2.x was not picking these up.
Move them back to the root of the module so they are properly picked up
on these older versions but still prevents calling the routing functions
during an API doc generation.
2023-11-25 15:35:25 -05:00
3e001b08b6 Bump version to 0.9.81 2023-11-17 01:29:41 -05:00
7f6b3ebb6b Update test script 2023-11-17 01:03:56 -05:00
91858fbd20 Update manage script 2023-11-16 23:12:53 -05:00
b66cfb07d8 Isolate cluster-dependent Celery startup
Avoids calling unworkable functions when generating API docs etc. by
isolating them into a Celery startup function called by Daemon.py.

Also update to Celery 4+ settings format.
2023-11-16 20:32:29 -05:00
9885914abd Remove stray periods from messages 2023-11-16 19:56:24 -05:00
e8da3714c0 Convert benchmark to use new Celery step structure 2023-11-16 19:36:23 -05:00
4d23d0419c Fix total stage count 2023-11-16 18:41:43 -05:00
12 changed files with 254 additions and 162 deletions

2
.gitignore vendored
View File

@ -1,6 +1,8 @@
*.pyc
*.tmp
*.swp
# Ignore swagger output (for pvc-docs instead)
swagger.json
# Ignore build artifacts
debian/pvc-*/
debian/*.log

View File

@ -1 +1 @@
0.9.80
0.9.82

View File

@ -1,5 +1,25 @@
## PVC Changelog
###### [v0.9.82](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.82)
* [API Daemon] Fixes a bug where the Celery result_backend was not loading properly on Celery <5.2.x (Debian 10/11).
###### [v0.9.81](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.81)
**Breaking Changes:** This large release features a number of major changes. While these should all be a seamless transition, the behaviour of several commands and the backend system for handling them has changed significantly, along with new dependencies from PVC Ansible. A full cluster configuration update via `pvc.yml` is recommended after installing this version. Redis is replaced with KeyDB on coordinator nodes as a Celery backend; this transition will be handled gracefully by the `pvc-ansible` playbooks, though note that KeyDB will be exposed on the Upstream interface. The Celery worker system is renamed `pvcworkerd`, is now active on all nodes (coordinator and non-coordinator), and is expanded to encompass several commands that previously used a similar, custom setup within the node daemons, including "pvc vm flush-locks" and all "pvc storage osd" tasks. The previously-mentioned CLI commands now all feature "--wait"/"--no-wait" flags, with wait showing a progress bar and status output of the task run. The "pvc cluster task" command can now used for viewing all task types, replacing the previously-custom/specific "pvc provisioner status" command. All example provisioner scripts have been updated to leverage new helper functions in the Celery system; while updating these is optional, an administrator is recommended to do so for optimal log output behaviour.
* [CLI Client] Fixes "--live" argument handling and duplicate restart prompts.
* [All] Adds support for multiple OSDs on individual disks (NVMe workloads).
* [All] Corrects and updates OSD replace, refresh, remove, and add functionality; replace no longer purges.
* [All] Switches to KeyDB (multi-master) instead of Redis and adds node monitoring plugin.
* [All] Replaces Zookeeper/Node Daemon-based message passing and task handling with pvcworkerd Celery workers on all nodes; increases worker concurrency to 3 (per node).
* [All] Moves all task-like functions to Celery and updates existing Celery tasks to use new helpers and ID system.
* [CLI Client] Adds "--wait/--no-wait" options with progress bars to all Celery-based tasks, "--wait" default; adds a standardized task interface under "pvc cluster task".
* [Node Daemon] Cleans up the fencing handler and related functions.
* [Node Daemon] Fixes bugs with VM memory reporting during keepalives.
* [Node Daemon] Fixes a potential race condition during primary/secondary transition by backgrounding systemctl commands.
* [API Daemon] Updates example provisioner plugins to use new Celery functions.
###### [v0.9.80](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.80)
* [CLI] Improves CLI performance by not loading "pkg_resources" until needed

View File

@ -27,7 +27,7 @@ from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool
# Daemon version
version = "0.9.80"
version = "0.9.82"
# API version
API_VERSION = 1.0
@ -155,6 +155,7 @@ def entrypoint():
print("|----------------------------------------------------------|")
print("")
pvc_api.celery_startup()
pvc_api.app.run(
config["listen_address"],
config["listen_port"],

View File

@ -22,16 +22,22 @@
import psycopg2
import psycopg2.extras
from datetime import datetime
from json import loads, dumps
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph
# Define the current test format
TEST_FORMAT = 1
# We run a total of 8 tests, to give a generalized idea of performance on the cluster:
# 1. A sequential read test of 8GB with a 4M block size
# 2. A sequential write test of 8GB with a 4M block size
@ -104,15 +110,16 @@ benchmark_volume_size = "8G"
# Exceptions (used by Celery tasks)
#
class BenchmarkError(Exception):
"""
An exception that results from the Benchmark job.
"""
pass
def __init__(
self, message, job_name=None, db_conn=None, db_cur=None, zkhandler=None
):
self.message = message
if job_name is not None and db_conn is not None and db_cur is not None:
#
# Common functions
#
def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
if db_conn is not None and db_cur is not None:
# Clean up our dangling result
query = "DELETE FROM storage_benchmarks WHERE job = %s;"
args = (job_name,)
@ -120,16 +127,9 @@ class BenchmarkError(Exception):
db_conn.commit()
# Close the database connections cleanly
close_database(db_conn, db_cur)
if job_name is not None and zkhandler is not None:
if zkhandler is not None:
zkhandler.disconnect()
def __str__(self):
return str(self.message)
#
# Common functions
#
del zkhandler
# Database connections
@ -193,17 +193,18 @@ def prepare_benchmark_volume(
zkhandler, pool, benchmark_volume_name, benchmark_volume_size
)
if not retcode:
raise BenchmarkError(
'Failed to create volume "{}" on pool "{}": {}'.format(
benchmark_volume_name, pool, retmsg
),
job_name=job_name,
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f'Failed to create volume "{benchmark_volume_name}" on pool "{pool}": {retmsg}',
)
else:
print(retmsg)
log_info(None, retmsg)
def cleanup_benchmark_volume(
@ -212,24 +213,25 @@ def cleanup_benchmark_volume(
# Remove the RBD volume
retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, benchmark_volume_name)
if not retcode:
raise BenchmarkError(
'Failed to remove volume "{}" on pool "{}": {}'.format(
benchmark_volume_name, pool, retmsg
),
job_name=job_name,
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f'Failed to remove volume "{benchmark_volume_name}" from pool "{pool}": {retmsg}',
)
else:
print(retmsg)
log_info(None, retmsg)
def run_benchmark_job(
test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
):
test_spec = test_matrix[test]
print("Running test '{}'".format(test))
log_info(None, f"Running test '{test}'")
fio_cmd = """
fio \
--name={test} \
@ -255,51 +257,73 @@ def run_benchmark_job(
rw=test_spec["rw"],
)
print("Running fio job: {}".format(" ".join(fio_cmd.split())))
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
try:
jstdout = loads(stdout)
if retcode:
raise BenchmarkError(
"Failed to run fio test: {}".format(stderr),
job_name=job_name,
raise
except Exception:
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(
None,
f"Failed to run fio test '{test}': {stderr}",
)
return loads(stdout)
return jstdout
def run_benchmark(self, pool):
# Runtime imports
import time
from datetime import datetime
# Define the current test format
TEST_FORMAT = 1
time.sleep(2)
# Phase 0 - connect to databases
try:
db_conn, db_cur = open_database(config)
except Exception:
print("FATAL - failed to connect to Postgres")
raise Exception
try:
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
print("FATAL - failed to connect to Zookeeper")
raise Exception
fail(
self,
"Failed to connect to Zookeeper",
)
cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node")
job_name = "{}_{}".format(cur_time, cur_primary)
job_name = f"{cur_time}_{cur_primary}"
print("Starting storage benchmark '{}' on pool '{}'".format(job_name, pool))
current_stage = 0
total_stages = 13
start(
self,
f"Running storage benchmark '{job_name}' on pool '{pool}'",
current=current_stage,
total=total_stages,
)
try:
db_conn, db_cur = open_database(config)
except Exception:
cleanup(
job_name,
db_conn=None,
db_cur=None,
zkhandler=zkhandler,
)
fail(
self,
"Failed to connect to Postgres",
)
current_stage += 1
update(
self,
"Storing running status in database",
current=current_stage,
total=total_stages,
)
print("Storing running status for job '{}' in database".format(job_name))
try:
query = "INSERT INTO storage_benchmarks (job, test_format, result) VALUES (%s, %s, %s);"
args = (
@ -310,20 +334,21 @@ def run_benchmark(self, pool):
db_cur.execute(query, args)
db_conn.commit()
except Exception as e:
raise BenchmarkError(
"Failed to store running status: {}".format(e),
job_name=job_name,
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError)
# Phase 1 - volume preparation
self.update_state(
state="RUNNING",
meta={"current": 1, "total": 3, "status": "Creating benchmark volume"},
current_stage += 1
update(
self,
"Creating benchmark volume",
current=current_stage,
total=total_stages,
)
time.sleep(1)
prepare_benchmark_volume(
pool,
@ -334,14 +359,16 @@ def run_benchmark(self, pool):
)
# Phase 2 - benchmark run
self.update_state(
state="RUNNING",
meta={"current": 2, "total": 3, "status": "Running fio benchmarks on volume"},
)
time.sleep(1)
results = dict()
for test in test_matrix:
current_stage += 1
update(
self,
f"Running benchmark job '{test}'",
current=current_stage,
total=total_stages,
)
results[test] = run_benchmark_job(
test,
pool,
@ -352,11 +379,13 @@ def run_benchmark(self, pool):
)
# Phase 3 - cleanup
self.update_state(
state="RUNNING",
meta={"current": 3, "total": 3, "status": "Cleaning up and storing results"},
current_stage += 1
update(
self,
"Cleaning up venchmark volume",
current=current_stage,
total=total_stages,
)
time.sleep(1)
cleanup_benchmark_volume(
pool,
@ -366,27 +395,39 @@ def run_benchmark(self, pool):
zkhandler=zkhandler,
)
print("Storing result of tests for job '{}' in database".format(job_name))
current_stage += 1
update(
self,
"Storing results in database",
current=current_stage,
total=total_stages,
)
try:
query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;"
args = (dumps(results), job_name)
db_cur.execute(query, args)
db_conn.commit()
except Exception as e:
raise BenchmarkError(
"Failed to store test results: {}".format(e),
job_name=job_name,
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError)
cleanup(
job_name,
db_conn=db_conn,
db_cur=db_cur,
zkhandler=zkhandler,
)
close_database(db_conn, db_cur)
zkhandler.disconnect()
del zkhandler
return {
"status": "Storage benchmark '{}' completed successfully.",
"current": 3,
"total": 3,
}
current_stage += 1
return finish(
self,
f"Storage benchmark {job_name} completed successfully",
current=current_stage,
total=total_stages,
)

View File

@ -57,64 +57,6 @@ from flask_sqlalchemy import SQLAlchemy
# Create Flask app and set config values
app = flask.Flask(__name__)
celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
)
app.config["CELERY_BROKER_URL"] = celery_task_uri
app.config["CELERY_RESULT_BACKEND"] = celery_task_uri
# Set up Celery queues
@ZKConnection(config)
def get_all_nodes(zkhandler):
_, all_nodes = get_node_list(zkhandler, None)
return [n["name"] for n in all_nodes]
@ZKConnection(config)
def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
app.config["CELERY_QUEUES"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
else:
run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}")
print("----")
return run_on
app.config["CELERY_ROUTES"] = (route_task,)
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
)
return task
# Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
@ -144,15 +86,77 @@ blueprint = flask.Blueprint("api", __name__, url_prefix="/api/v1")
api = Api(blueprint)
app.register_blueprint(blueprint)
# Set up Celery queues
@ZKConnection(config)
def get_all_nodes(zkhandler):
_, all_nodes = get_node_list(zkhandler, None)
return [n["name"] for n in all_nodes]
@ZKConnection(config)
def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
else:
run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}")
print("----")
return run_on
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
)
return task
# Create celery definition
celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
)
celery = Celery(
app.name,
broker=celery_task_uri,
result_backend=celery_task_uri,
result_extended=True,
)
app.config["broker_url"] = celery_task_uri
app.config["result_backend"] = celery_task_uri
celery.conf.update(app.config)
def celery_startup():
"""
Runs when the API daemon starts, but not the Celery workers or the API doc generator
"""
app.config["task_queues"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
app.config["task_routes"] = (route_task,)
celery.conf.update(app.config)
#
# Custom decorators
#

View File

@ -220,7 +220,7 @@ def create_vm(
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
):
current_stage = 0
total_stages = 10
total_stages = 11
start(
celery,
f"Provisioning new VM '{vm_name}' with profile '{vm_profile}'",
@ -373,7 +373,7 @@ def create_vm(
if pvc_vm.searchClusterByName(zkhandler, vm_name):
fail(
celery,
f"A VM with the name '{vm_name}' already exists in the cluster.",
f"A VM with the name '{vm_name}' already exists in the cluster",
exception=ClusterError,
)
@ -416,7 +416,7 @@ def create_vm(
]:
fail(
celery,
f'The network VNI "{vni}" is not present on the cluster.',
f'The network VNI "{vni}" is not present on the cluster',
exception=ClusterError,
)
@ -432,7 +432,7 @@ def create_vm(
if not volume_data:
fail(
celery,
f"The source volume {volume['pool']}/{volume['source_volume']} could not be found.",
f"The source volume {volume['pool']}/{volume['source_volume']} could not be found",
exception=ClusterError,
)
if not volume["pool"] in pools:
@ -463,7 +463,7 @@ def create_vm(
except Exception:
fail(
celery,
f'Pool "{pool}" is not present on the cluster.',
f'Pool "{pool}" is not present on the cluster',
exception=ClusterError,
)
pool_free_space_gb = int(
@ -474,7 +474,7 @@ def create_vm(
if pool_vm_usage_gb >= pool_free_space_gb:
fail(
celery,
f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB.',
f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB',
exception=ClusterError,
)

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="pvc",
version="0.9.80",
version="0.9.82",
packages=["pvc.cli", "pvc.lib"],
install_requires=[
"Click",

24
debian/changelog vendored
View File

@ -1,3 +1,27 @@
pvc (0.9.82-0) unstable; urgency=high
* [API Daemon] Fixes a bug where the Celery result_backend was not loading properly on Celery <5.2.x (Debian 10/11).
-- Joshua M. Boniface <joshua@boniface.me> Sat, 25 Nov 2023 15:38:50 -0500
pvc (0.9.81-0) unstable; urgency=high
**Breaking Changes:** This large release features a number of major changes. While these should all be a seamless transition, the behaviour of several commands and the backend system for handling them has changed significantly, along with new dependencies from PVC Ansible. A full cluster configuration update via `pvc.yml` is recommended after installing this version. Redis is replaced with KeyDB on coordinator nodes as a Celery backend; this transition will be handled gracefully by the `pvc-ansible` playbooks, though note that KeyDB will be exposed on the Upstream interface. The Celery worker system is renamed `pvcworkerd`, is now active on all nodes (coordinator and non-coordinator), and is expanded to encompass several commands that previously used a similar, custom setup within the node daemons, including "pvc vm flush-locks" and all "pvc storage osd" tasks. The previously-mentioned CLI commands now all feature "--wait"/"--no-wait" flags, with wait showing a progress bar and status output of the task run. The "pvc cluster task" command can now used for viewing all task types, replacing the previously-custom/specific "pvc provisioner status" command. All example provisioner scripts have been updated to leverage new helper functions in the Celery system; while updating these is optional, an administrator is recommended to do so for optimal log output behaviour.
* [CLI Client] Fixes "--live" argument handling and duplicate restart prompts.
* [All] Adds support for multiple OSDs on individual disks (NVMe workloads).
* [All] Corrects and updates OSD replace, refresh, remove, and add functionality; replace no longer purges.
* [All] Switches to KeyDB (multi-master) instead of Redis and adds node monitoring plugin.
* [All] Replaces Zookeeper/Node Daemon-based message passing and task handling with pvcworkerd Celery workers on all nodes; increases worker concurrency to 3 (per node).
* [All] Moves all task-like functions to Celery and updates existing Celery tasks to use new helpers and ID system.
* [CLI Client] Adds "--wait/--no-wait" options with progress bars to all Celery-based tasks, "--wait" default; adds a standardized task interface under "pvc cluster task".
* [Node Daemon] Cleans up the fencing handler and related functions.
* [Node Daemon] Fixes bugs with VM memory reporting during keepalives.
* [Node Daemon] Fixes a potential race condition during primary/secondary transition by backgrounding systemctl commands.
* [API Daemon] Updates example provisioner plugins to use new Celery functions.
-- Joshua M. Boniface <joshua@boniface.me> Fri, 17 Nov 2023 01:29:41 -0500
pvc (0.9.80-0) unstable; urgency=high
* [CLI] Improves CLI performance by not loading "pkg_resources" until needed

View File

@ -7,7 +7,7 @@ VERSION="$( head -1 debian/changelog | awk -F'[()-]' '{ print $2 }' )"
pushd $( git rev-parse --show-toplevel ) &>/dev/null
pushd api-daemon &>/dev/null
export PVC_CONFIG_FILE="./pvcapid.sample.yaml"
./pvcapid-manage.py db migrate -m "PVC version ${VERSION}"
./pvcapid-manage.py db upgrade
./pvcapid-manage_flask.py db migrate -m "PVC version ${VERSION}"
./pvcapid-manage_flask.py db upgrade
popd &>/dev/null
popd &>/dev/null

View File

@ -49,7 +49,7 @@ import re
import json
# Daemon version
version = "0.9.80"
version = "0.9.82"
##########################################################

View File

@ -78,9 +78,9 @@ _pvc vm tag get testx
_pvc vm list --tag mytag
_pvc vm tag remove testx mytag
_pvc vm network get testx
_pvc vm vcpu set --no-restart testx 4
_pvc vm vcpu set --no-restart testx 1
_pvc vm vcpu get testx
_pvc vm memory set --no-restart testx 4096
_pvc vm memory set --no-restart testx 1024
_pvc vm memory get testx
_pvc vm vcpu set --no-restart testx 2
_pvc vm memory set testx 2048 --restart --yes