Isolate cluster-dependent Celery startup
Avoids calling unworkable functions when generating API docs etc. by isolating them into a Celery startup function called by Daemon.py. Also update to Celery 4+ settings format.
This commit is contained in:
parent
9885914abd
commit
b66cfb07d8
|
@ -155,6 +155,7 @@ def entrypoint():
|
|||
print("|----------------------------------------------------------|")
|
||||
print("")
|
||||
|
||||
pvc_api.celery_startup()
|
||||
pvc_api.app.run(
|
||||
config["listen_address"],
|
||||
config["listen_port"],
|
||||
|
|
|
@ -57,64 +57,6 @@ from flask_sqlalchemy import SQLAlchemy
|
|||
|
||||
# Create Flask app and set config values
|
||||
app = flask.Flask(__name__)
|
||||
celery_task_uri = "redis://{}:{}{}".format(
|
||||
config["queue_host"], config["queue_port"], config["queue_path"]
|
||||
)
|
||||
app.config["CELERY_BROKER_URL"] = celery_task_uri
|
||||
app.config["CELERY_RESULT_BACKEND"] = celery_task_uri
|
||||
|
||||
|
||||
# Set up Celery queues
|
||||
@ZKConnection(config)
|
||||
def get_all_nodes(zkhandler):
|
||||
_, all_nodes = get_node_list(zkhandler, None)
|
||||
return [n["name"] for n in all_nodes]
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def get_primary_node(zkhandler):
|
||||
return getPrimaryNode(zkhandler)
|
||||
|
||||
|
||||
app.config["CELERY_QUEUES"] = tuple(
|
||||
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
|
||||
)
|
||||
|
||||
|
||||
# Set up Celery queue routing
|
||||
def route_task(name, args, kwargs, options, task=None, **kw):
|
||||
print("----")
|
||||
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
|
||||
|
||||
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
|
||||
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
|
||||
run_on = kwargs[options["routing_key"]]
|
||||
if run_on == "primary":
|
||||
run_on = get_primary_node()
|
||||
# Otherwise, use the primary node
|
||||
else:
|
||||
run_on = get_primary_node()
|
||||
|
||||
print(f"Selected Celery worker: {run_on}")
|
||||
print("----")
|
||||
|
||||
return run_on
|
||||
|
||||
|
||||
app.config["CELERY_ROUTES"] = (route_task,)
|
||||
|
||||
|
||||
# Set up Celery task ID generator
|
||||
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
|
||||
def run_celery_task(task_def, **kwargs):
|
||||
task_id = str(uuid4()).split("-")[0]
|
||||
task = task_def.apply_async(
|
||||
(),
|
||||
kwargs,
|
||||
task_id=task_id,
|
||||
)
|
||||
return task
|
||||
|
||||
|
||||
# Set up SQLAlchemy backend
|
||||
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||
|
@ -144,14 +86,72 @@ blueprint = flask.Blueprint("api", __name__, url_prefix="/api/v1")
|
|||
api = Api(blueprint)
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
|
||||
# Set up Celery queues
|
||||
@ZKConnection(config)
|
||||
def get_all_nodes(zkhandler):
|
||||
_, all_nodes = get_node_list(zkhandler, None)
|
||||
return [n["name"] for n in all_nodes]
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def get_primary_node(zkhandler):
|
||||
return getPrimaryNode(zkhandler)
|
||||
|
||||
|
||||
# Set up Celery queue routing
|
||||
def route_task(name, args, kwargs, options, task=None, **kw):
|
||||
print("----")
|
||||
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
|
||||
|
||||
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
|
||||
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
|
||||
run_on = kwargs[options["routing_key"]]
|
||||
if run_on == "primary":
|
||||
run_on = get_primary_node()
|
||||
# Otherwise, use the primary node
|
||||
else:
|
||||
run_on = get_primary_node()
|
||||
|
||||
print(f"Selected Celery worker: {run_on}")
|
||||
print("----")
|
||||
|
||||
return run_on
|
||||
|
||||
|
||||
# Set up Celery task ID generator
|
||||
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
|
||||
def run_celery_task(task_def, **kwargs):
|
||||
task_id = str(uuid4()).split("-")[0]
|
||||
task = task_def.apply_async(
|
||||
(),
|
||||
kwargs,
|
||||
task_id=task_id,
|
||||
)
|
||||
return task
|
||||
|
||||
|
||||
# Create celery definition
|
||||
celery_task_uri = "redis://{}:{}{}".format(
|
||||
config["queue_host"], config["queue_port"], config["queue_path"]
|
||||
)
|
||||
celery = Celery(
|
||||
app.name,
|
||||
broker=celery_task_uri,
|
||||
result_backend=celery_task_uri,
|
||||
result_extended=True,
|
||||
)
|
||||
celery.conf.update(app.config)
|
||||
|
||||
|
||||
def celery_startup():
|
||||
app.config["CELERY_broker_url"] = celery_task_uri
|
||||
app.config["result_backend"] = celery_task_uri
|
||||
app.config["task_queues"] = tuple(
|
||||
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
|
||||
)
|
||||
app.config["task_routes"] = (route_task,)
|
||||
celery.conf.update(app.config)
|
||||
|
||||
|
||||
#
|
||||
# Custom decorators
|
||||
|
|
Loading…
Reference in New Issue