From a6caac1b781d1c48e7b49dd9e5ed249b2fd83fe4 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sat, 4 Nov 2023 14:29:59 -0400 Subject: [PATCH] Add Celery queue routing for tasks By default, tasks will continue to run as they did, on the primary coordinator's task runner. However this opens the possibility for defining more tasks that will run on other nodes or coordinators. --- api-daemon/pvcapid/flaskapi.py | 36 +++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index fee4933a..3d98c28a 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -24,6 +24,7 @@ import flask from functools import wraps from flask_restful import Resource, Api, reqparse, abort from celery import Celery +from kombu import Queue from pvcapid.Daemon import config, strtobool, API_VERSION @@ -56,7 +57,36 @@ app.config["CELERY_RESULT_BACKEND"] = "db+postgresql://{}:{}@{}:{}/{}".format( config["database_port"], config["database_name"], ) -app.config.database_engine_options = {"echo": True} +# Set up queue routing +app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True} +app.config["CELERY_QUEUES"] = tuple( + [Queue(h, routing_key=f"{h}.#") for h in config["coordinators"]] +) + + +def route_task(name, args, kwargs, options, task=None, **kw): + from daemon_lib.common import getPrimaryNode + from daemon_lib.zkhandler import ZKConnection + + @ZKConnection(config) + def get_primary_node(zkhandler): + return getPrimaryNode(zkhandler) + + print("----") + print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") + + if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): + run_on = kwargs[options["routing_key"]] + else: + run_on = get_primary_node() + + print(f"Selected Celery worker: {run_on}") + print("----") + + return run_on + + +app.config["CELERY_ROUTES"] = (route_task,) # Set up SQLAlchemy backend app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False @@ -159,7 +189,7 @@ def Authenticator(function): # # Job functions # -@celery.task(bind=True) +@celery.task(name="provisioner.create", bind=True) def create_vm( self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[] ): @@ -173,7 +203,7 @@ def create_vm( ) -@celery.task(bind=True) +@celery.task(name="storage.benchmark", bind=True) def run_benchmark(self, pool): return api_benchmark.run_benchmark(self, pool)