From 0c0fb65c6220efd0c441329901d367a6300b8640 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 30 Nov 2023 00:40:09 -0500 Subject: [PATCH] Rework Flask API to route Celery tasks manually Avoids needing to define any of these tasks here; they can all be defined in the pvcworkerd code. --- api-daemon/pvcapid/flaskapi.py | 227 ++++----------------------------- 1 file changed, 27 insertions(+), 200 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index f10eecf8..dfca2340 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -99,35 +99,28 @@ def get_primary_node(zkhandler): return getPrimaryNode(zkhandler) -# Set up Celery queue routing -def route_task(name, args, kwargs, options, task=None, **kw): - print("----") - print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") +# Set up Celery task ID generator +# 1. Lets us make our own IDs (first section of UUID) +# 2. Lets us distribute jobs to the required pvcworkerd instances +def run_celery_task(task_name, **kwargs): + task_id = str(uuid4()).split("-")[0] - # If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue - if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): - run_on = kwargs[options["routing_key"]] - if run_on == "primary": - run_on = get_primary_node() - # Otherwise, use the primary node + if "run_on" in kwargs and kwargs["run_on"] != "primary": + run_on = kwargs["run_on"] else: run_on = get_primary_node() - print(f"Selected Celery worker: {run_on}") - print("----") - - return run_on - - -# Set up Celery task ID generator -# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID. -def run_celery_task(task_def, **kwargs): - task_id = str(uuid4()).split("-")[0] - task = task_def.apply_async( - (), - kwargs, - task_id=task_id, + print( + f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}" ) + + task = celery.send_task( + task_name, + task_id=task_id, + kwargs=kwargs, + queue=run_on, + ) + return task @@ -153,7 +146,6 @@ def celery_startup(): app.config["task_queues"] = tuple( [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] ) - app.config["task_routes"] = (route_task,) celery.conf.update(app.config) @@ -220,171 +212,6 @@ def Authenticator(function): return authenticate -# -# Job functions -# -@celery.task(name="provisioner.create", bind=True, routing_key="run_on") -def create_vm( - self, - vm_name=None, - profile_name=None, - define_vm=True, - start_vm=True, - script_run_args=[], - run_on="primary", -): - return api_vmbuilder.create_vm( - self, - vm_name, - profile_name, - define_vm=define_vm, - start_vm=start_vm, - script_run_args=script_run_args, - ) - - -@celery.task(name="storage.benchmark", bind=True, routing_key="run_on") -def run_benchmark(self, pool=None, run_on="primary"): - return api_benchmark.run_benchmark(self, pool) - - -@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on") -def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"): - @ZKConnection(config) - def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False): - return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock) - - return run_vm_flush_locks(self, domain, force_unlock=force_unlock) - - -@celery.task(name="vm.device_attach", bind=True, routing_key="run_on") -def vm_device_attach(self, domain=None, xml=None, run_on=None): - @ZKConnection(config) - def run_vm_device_attach(zkhandler, self, domain, xml): - return vm_worker_attach_device(zkhandler, self, domain, xml) - - return run_vm_device_attach(self, domain, xml) - - -@celery.task(name="vm.device_detach", bind=True, routing_key="run_on") -def vm_device_detach(self, domain=None, xml=None, run_on=None): - @ZKConnection(config) - def run_vm_device_detach(zkhandler, self, domain, xml): - return vm_worker_detach_device(zkhandler, self, domain, xml) - - return run_vm_device_detach(self, domain, xml) - - -@celery.task(name="osd.add", bind=True, routing_key="run_on") -def osd_add( - self, - device=None, - weight=None, - ext_db_ratio=None, - ext_db_size=None, - split_count=None, - run_on=None, -): - @ZKConnection(config) - def run_osd_add( - zkhandler, - self, - run_on, - device, - weight, - ext_db_ratio=None, - ext_db_size=None, - split_count=None, - ): - return osd_worker_add_osd( - zkhandler, - self, - run_on, - device, - weight, - ext_db_ratio, - ext_db_size, - split_count, - ) - - return run_osd_add( - self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count - ) - - -@celery.task(name="osd.replace", bind=True, routing_key="run_on") -def osd_replace( - self, - osd_id=None, - new_device=None, - old_device=None, - weight=None, - ext_db_ratio=None, - ext_db_size=None, - run_on=None, -): - @ZKConnection(config) - def run_osd_replace( - zkhandler, - self, - run_on, - osd_id, - new_device, - old_device=None, - weight=None, - ext_db_ratio=None, - ext_db_size=None, - ): - return osd_worker_replace_osd( - zkhandler, - self, - run_on, - osd_id, - new_device, - old_device, - weight, - ext_db_ratio, - ext_db_size, - ) - - return run_osd_replace( - self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size - ) - - -@celery.task(name="osd.refresh", bind=True, routing_key="run_on") -def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None): - @ZKConnection(config) - def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False): - return osd_worker_refresh_osd( - zkhandler, self, run_on, osd_id, device, ext_db_flag - ) - - return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag) - - -@celery.task(name="osd.remove", bind=True, routing_key="run_on") -def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None): - @ZKConnection(config) - def run_osd_remove( - zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False - ): - return osd_worker_remove_osd( - zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag - ) - - return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag) - - -@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on") -def osd_add_db_vg(self, device=None, run_on=None): - @ZKConnection(config) - def run_osd_add_db_vg(zkhandler, self, run_on, device): - return osd_worker_add_db_vg(zkhandler, self, run_on, device) - - return run_osd_add_db_vg(self, run_on, device) - - ########################################################## # API Root/Authentication ########################################################## @@ -2459,7 +2286,7 @@ class API_VM_Locks(Resource): else: return vm_node_detail, retcode - task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node) + task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node) return ( {"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node}, @@ -2594,7 +2421,7 @@ class API_VM_Device(Resource): else: return vm_node_detail, retcode - task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node) + task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node) return ( {"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node}, @@ -2648,7 +2475,7 @@ class API_VM_Device(Resource): else: return vm_node_detail, retcode - task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node) + task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node) return ( {"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node}, @@ -4405,7 +4232,7 @@ class API_Storage_Ceph_Benchmark(Resource): }, 400 task = run_celery_task( - run_benchmark, pool=reqargs.get("pool", None), run_on="primary" + "storage.benchmark", pool=reqargs.get("pool", None), run_on="primary" ) return ( { @@ -4531,7 +4358,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource): node = reqargs.get("node", None) task = run_celery_task( - osd_add_db_vg, device=reqargs.get("device", None), run_on=node + "osd.add_db_vg", device=reqargs.get("device", None), run_on=node ) return ( @@ -4730,7 +4557,7 @@ class API_Storage_Ceph_OSD_Root(Resource): node = reqargs.get("node", None) task = run_celery_task( - osd_add, + "osd.add", device=reqargs.get("device", None), weight=reqargs.get("weight", None), ext_db_ratio=reqargs.get("ext_db_ratio", None), @@ -4849,7 +4676,7 @@ class API_Storage_Ceph_OSD_Element(Resource): return osd_node_detail, retcode task = run_celery_task( - osd_replace, + "osd.replace", osd_id=osdid, new_device=reqargs.get("new_device"), old_device=reqargs.get("old_device", None), @@ -4907,7 +4734,7 @@ class API_Storage_Ceph_OSD_Element(Resource): return osd_node_detail, retcode task = run_celery_task( - osd_refresh, + "osd.refresh", osd_id=osdid, device=reqargs.get("device", None), ext_db_flag=False, @@ -4978,7 +4805,7 @@ class API_Storage_Ceph_OSD_Element(Resource): return osd_node_detail, retcode task = run_celery_task( - osd_remove, + "osd.remove", osd_id=osdid, force_flag=reqargs.get("force", False), run_on=node, @@ -8507,7 +8334,7 @@ class API_Provisioner_Create_Root(Resource): start_vm = False task = run_celery_task( - create_vm, + "provisioner.create", vm_name=reqargs.get("name", None), profile_name=reqargs.get("profile", None), define_vm=define_vm,