diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index fee4933a..3d98c28a 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -24,6 +24,7 @@ import flask from functools import wraps from flask_restful import Resource, Api, reqparse, abort from celery import Celery +from kombu import Queue from pvcapid.Daemon import config, strtobool, API_VERSION @@ -56,7 +57,36 @@ app.config["CELERY_RESULT_BACKEND"] = "db+postgresql://{}:{}@{}:{}/{}".format( config["database_port"], config["database_name"], ) -app.config.database_engine_options = {"echo": True} +# Set up queue routing +app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True} +app.config["CELERY_QUEUES"] = tuple( + [Queue(h, routing_key=f"{h}.#") for h in config["coordinators"]] +) + + +def route_task(name, args, kwargs, options, task=None, **kw): + from daemon_lib.common import getPrimaryNode + from daemon_lib.zkhandler import ZKConnection + + @ZKConnection(config) + def get_primary_node(zkhandler): + return getPrimaryNode(zkhandler) + + print("----") + print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") + + if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): + run_on = kwargs[options["routing_key"]] + else: + run_on = get_primary_node() + + print(f"Selected Celery worker: {run_on}") + print("----") + + return run_on + + +app.config["CELERY_ROUTES"] = (route_task,) # Set up SQLAlchemy backend app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False @@ -159,7 +189,7 @@ def Authenticator(function): # # Job functions # -@celery.task(bind=True) +@celery.task(name="provisioner.create", bind=True) def create_vm( self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[] ): @@ -173,7 +203,7 @@ def create_vm( ) -@celery.task(bind=True) +@celery.task(name="storage.benchmark", bind=True) def run_benchmark(self, pool): return api_benchmark.run_benchmark(self, pool)