Add Celery queue routing for tasks

By default, tasks will continue to run as they did, on the primary
coordinator's task runner. However this opens the possibility for
defining more tasks that will run on other nodes or coordinators.
This commit is contained in:
Joshua Boniface 2023-11-04 14:29:59 -04:00
parent 30d7e49401
commit a6caac1b78
1 changed files with 33 additions and 3 deletions

View File

@ -24,6 +24,7 @@ import flask
from functools import wraps from functools import wraps
from flask_restful import Resource, Api, reqparse, abort from flask_restful import Resource, Api, reqparse, abort
from celery import Celery from celery import Celery
from kombu import Queue
from pvcapid.Daemon import config, strtobool, API_VERSION from pvcapid.Daemon import config, strtobool, API_VERSION
@ -56,7 +57,36 @@ app.config["CELERY_RESULT_BACKEND"] = "db+postgresql://{}:{}@{}:{}/{}".format(
config["database_port"], config["database_port"],
config["database_name"], config["database_name"],
) )
app.config.database_engine_options = {"echo": True} # Set up queue routing
app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True}
app.config["CELERY_QUEUES"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in config["coordinators"]]
)
def route_task(name, args, kwargs, options, task=None, **kw):
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
@ZKConnection(config)
def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
else:
run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}")
print("----")
return run_on
app.config["CELERY_ROUTES"] = (route_task,)
# Set up SQLAlchemy backend # Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
@ -159,7 +189,7 @@ def Authenticator(function):
# #
# Job functions # Job functions
# #
@celery.task(bind=True) @celery.task(name="provisioner.create", bind=True)
def create_vm( def create_vm(
self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[] self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[]
): ):
@ -173,7 +203,7 @@ def create_vm(
) )
@celery.task(bind=True) @celery.task(name="storage.benchmark", bind=True)
def run_benchmark(self, pool): def run_benchmark(self, pool):
return api_benchmark.run_benchmark(self, pool) return api_benchmark.run_benchmark(self, pool)