Rework Flask API to route Celery tasks manually

Avoids needing to define any of these tasks here; they can all be
defined in the pvcworkerd code.
This commit is contained in:
Joshua Boniface 2023-11-30 00:40:09 -05:00
parent 03a738f878
commit 0c0fb65c62
1 changed files with 27 additions and 200 deletions

View File

@ -99,35 +99,28 @@ def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# Set up Celery task ID generator
# 1. Lets us make our own IDs (first section of UUID)
# 2. Lets us distribute jobs to the required pvcworkerd instances
def run_celery_task(task_name, **kwargs):
task_id = str(uuid4()).split("-")[0]
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
if "run_on" in kwargs and kwargs["run_on"] != "primary":
run_on = kwargs["run_on"]
else:
run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}")
print("----")
return run_on
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
print(
f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}"
)
task = celery.send_task(
task_name,
task_id=task_id,
kwargs=kwargs,
queue=run_on,
)
return task
@ -153,7 +146,6 @@ def celery_startup():
app.config["task_queues"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
app.config["task_routes"] = (route_task,)
celery.conf.update(app.config)
@ -220,171 +212,6 @@ def Authenticator(function):
return authenticate
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool=None, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
##########################################################
# API Root/Authentication
##########################################################
@ -2459,7 +2286,7 @@ class API_VM_Locks(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
@ -2594,7 +2421,7 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
@ -2648,7 +2475,7 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
@ -4405,7 +4232,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400
task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
)
return (
{
@ -4531,7 +4358,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
node = reqargs.get("node", None)
task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
"osd.add_db_vg", device=reqargs.get("device", None), run_on=node
)
return (
@ -4730,7 +4557,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
node = reqargs.get("node", None)
task = run_celery_task(
osd_add,
"osd.add",
device=reqargs.get("device", None),
weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None),
@ -4849,7 +4676,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_replace,
"osd.replace",
osd_id=osdid,
new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None),
@ -4907,7 +4734,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_refresh,
"osd.refresh",
osd_id=osdid,
device=reqargs.get("device", None),
ext_db_flag=False,
@ -4978,7 +4805,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_remove,
"osd.remove",
osd_id=osdid,
force_flag=reqargs.get("force", False),
run_on=node,
@ -8507,7 +8334,7 @@ class API_Provisioner_Create_Root(Resource):
start_vm = False
task = run_celery_task(
create_vm,
"provisioner.create",
vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None),
define_vm=define_vm,