diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 3d98c28a..7de4e876 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -26,6 +26,10 @@ from flask_restful import Resource, Api, reqparse, abort from celery import Celery from kombu import Queue +from daemon_lib.common import getPrimaryNode +from daemon_lib.zkhandler import ZKConnection +from daemon_lib.node import get_list as get_node_list + from pvcapid.Daemon import config, strtobool, API_VERSION import pvcapid.helper as api_helper @@ -57,17 +61,24 @@ app.config["CELERY_RESULT_BACKEND"] = "db+postgresql://{}:{}@{}:{}/{}".format( config["database_port"], config["database_name"], ) -# Set up queue routing + +# Set up Celery queues app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True} + + +@ZKConnection(config) +def get_all_nodes(zkhandler): + _, all_nodes = get_node_list(zkhandler, None) + return [n["name"] for n in all_nodes] + + app.config["CELERY_QUEUES"] = tuple( - [Queue(h, routing_key=f"{h}.#") for h in config["coordinators"]] + [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] ) +# Set up Celery queue routing def route_task(name, args, kwargs, options, task=None, **kw): - from daemon_lib.common import getPrimaryNode - from daemon_lib.zkhandler import ZKConnection - @ZKConnection(config) def get_primary_node(zkhandler): return getPrimaryNode(zkhandler) @@ -75,8 +86,10 @@ def route_task(name, args, kwargs, options, task=None, **kw): print("----") print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") + # If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): run_on = kwargs[options["routing_key"]] + # Otherwise, use the primary node else: run_on = get_primary_node() @@ -122,7 +135,6 @@ app.register_blueprint(blueprint) celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"]) celery.conf.update(app.config) - # # Custom decorators #