Ensure queues are set up for non-coordinator nodes

Allows a runner to operate on every possible node, not just
coordinators, as OSDs or other things could be on any node.

Also add more comments.
This commit is contained in:
Joshua Boniface 2023-11-04 15:05:07 -04:00
parent a6caac1b78
commit af8a8d969e
1 changed files with 18 additions and 6 deletions

View File

@ -26,6 +26,10 @@ from flask_restful import Resource, Api, reqparse, abort
from celery import Celery
from kombu import Queue
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.node import get_list as get_node_list
from pvcapid.Daemon import config, strtobool, API_VERSION
import pvcapid.helper as api_helper
@ -57,17 +61,24 @@ app.config["CELERY_RESULT_BACKEND"] = "db+postgresql://{}:{}@{}:{}/{}".format(
config["database_port"],
config["database_name"],
)
# Set up queue routing
# Set up Celery queues
app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True}
@ZKConnection(config)
def get_all_nodes(zkhandler):
_, all_nodes = get_node_list(zkhandler, None)
return [n["name"] for n in all_nodes]
app.config["CELERY_QUEUES"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in config["coordinators"]]
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
@ZKConnection(config)
def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
@ -75,8 +86,10 @@ def route_task(name, args, kwargs, options, task=None, **kw):
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
# Otherwise, use the primary node
else:
run_on = get_primary_node()
@ -122,7 +135,6 @@ app.register_blueprint(blueprint)
celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
#
# Custom decorators
#