diff --git a/api-daemon/pvcapid/Daemon.py b/api-daemon/pvcapid/Daemon.py index 2429d98a..5c9ee94e 100755 --- a/api-daemon/pvcapid/Daemon.py +++ b/api-daemon/pvcapid/Daemon.py @@ -155,6 +155,7 @@ def entrypoint(): print("|----------------------------------------------------------|") print("") + pvc_api.celery_startup() pvc_api.app.run( config["listen_address"], config["listen_port"], diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 68121927..6e4d5e81 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -57,64 +57,6 @@ from flask_sqlalchemy import SQLAlchemy # Create Flask app and set config values app = flask.Flask(__name__) -celery_task_uri = "redis://{}:{}{}".format( - config["queue_host"], config["queue_port"], config["queue_path"] -) -app.config["CELERY_BROKER_URL"] = celery_task_uri -app.config["CELERY_RESULT_BACKEND"] = celery_task_uri - - -# Set up Celery queues -@ZKConnection(config) -def get_all_nodes(zkhandler): - _, all_nodes = get_node_list(zkhandler, None) - return [n["name"] for n in all_nodes] - - -@ZKConnection(config) -def get_primary_node(zkhandler): - return getPrimaryNode(zkhandler) - - -app.config["CELERY_QUEUES"] = tuple( - [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] -) - - -# Set up Celery queue routing -def route_task(name, args, kwargs, options, task=None, **kw): - print("----") - print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") - - # If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue - if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): - run_on = kwargs[options["routing_key"]] - if run_on == "primary": - run_on = get_primary_node() - # Otherwise, use the primary node - else: - run_on = get_primary_node() - - print(f"Selected Celery worker: {run_on}") - print("----") - - return run_on - - -app.config["CELERY_ROUTES"] = (route_task,) - - -# Set up Celery task ID generator -# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID. -def run_celery_task(task_def, **kwargs): - task_id = str(uuid4()).split("-")[0] - task = task_def.apply_async( - (), - kwargs, - task_id=task_id, - ) - return task - # Set up SQLAlchemy backend app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False @@ -144,14 +86,72 @@ blueprint = flask.Blueprint("api", __name__, url_prefix="/api/v1") api = Api(blueprint) app.register_blueprint(blueprint) + +# Set up Celery queues +@ZKConnection(config) +def get_all_nodes(zkhandler): + _, all_nodes = get_node_list(zkhandler, None) + return [n["name"] for n in all_nodes] + + +@ZKConnection(config) +def get_primary_node(zkhandler): + return getPrimaryNode(zkhandler) + + +# Set up Celery queue routing +def route_task(name, args, kwargs, options, task=None, **kw): + print("----") + print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") + + # If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue + if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): + run_on = kwargs[options["routing_key"]] + if run_on == "primary": + run_on = get_primary_node() + # Otherwise, use the primary node + else: + run_on = get_primary_node() + + print(f"Selected Celery worker: {run_on}") + print("----") + + return run_on + + +# Set up Celery task ID generator +# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID. +def run_celery_task(task_def, **kwargs): + task_id = str(uuid4()).split("-")[0] + task = task_def.apply_async( + (), + kwargs, + task_id=task_id, + ) + return task + + # Create celery definition +celery_task_uri = "redis://{}:{}{}".format( + config["queue_host"], config["queue_port"], config["queue_path"] +) celery = Celery( app.name, broker=celery_task_uri, result_backend=celery_task_uri, result_extended=True, ) -celery.conf.update(app.config) + + +def celery_startup(): + app.config["CELERY_broker_url"] = celery_task_uri + app.config["result_backend"] = celery_task_uri + app.config["task_queues"] = tuple( + [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] + ) + app.config["task_routes"] = (route_task,) + celery.conf.update(app.config) + # # Custom decorators