From af8a8d969efacd2f2445d2fdc67e0cf8c71bbcd9 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sat, 4 Nov 2023 15:05:07 -0400 Subject: [PATCH] Ensure queues are set up for non-coordinator nodes Allows a runner to operate on every possible node, not just coordinators, as OSDs or other things could be on any node. Also add more comments. --- api-daemon/pvcapid/flaskapi.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 3d98c28a..7de4e876 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -26,6 +26,10 @@ from flask_restful import Resource, Api, reqparse, abort from celery import Celery from kombu import Queue +from daemon_lib.common import getPrimaryNode +from daemon_lib.zkhandler import ZKConnection +from daemon_lib.node import get_list as get_node_list + from pvcapid.Daemon import config, strtobool, API_VERSION import pvcapid.helper as api_helper @@ -57,17 +61,24 @@ app.config["CELERY_RESULT_BACKEND"] = "db+postgresql://{}:{}@{}:{}/{}".format( config["database_port"], config["database_name"], ) -# Set up queue routing + +# Set up Celery queues app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True} + + +@ZKConnection(config) +def get_all_nodes(zkhandler): + _, all_nodes = get_node_list(zkhandler, None) + return [n["name"] for n in all_nodes] + + app.config["CELERY_QUEUES"] = tuple( - [Queue(h, routing_key=f"{h}.#") for h in config["coordinators"]] + [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] ) +# Set up Celery queue routing def route_task(name, args, kwargs, options, task=None, **kw): - from daemon_lib.common import getPrimaryNode - from daemon_lib.zkhandler import ZKConnection - @ZKConnection(config) def get_primary_node(zkhandler): return getPrimaryNode(zkhandler) @@ -75,8 +86,10 @@ def route_task(name, args, kwargs, options, task=None, **kw): print("----") print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") + # If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): run_on = kwargs[options["routing_key"]] + # Otherwise, use the primary node else: run_on = get_primary_node() @@ -122,7 +135,6 @@ app.register_blueprint(blueprint) celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"]) celery.conf.update(app.config) - # # Custom decorators #