Use custom task IDs for Celery tasks

Full UUIDs were obnoxiously long, so switch to using just the first
8-character section of a UUID instead. Keeps the list nice and short,
makes them easier to copy, and is just generally nicer.

Could this cause uniqueness problems? Perhaps, but I don't see that
happening nearly frequently enough to matter.
This commit is contained in:
Joshua Boniface 2023-11-16 13:22:14 -05:00
parent 3651885954
commit 0cb81f96e6
1 changed files with 33 additions and 10 deletions

View File

@ -26,6 +26,7 @@ from flask_restful import Resource, Api, reqparse, abort
from celery import Celery from celery import Celery
from kombu import Queue from kombu import Queue
from lxml.objectify import fromstring as lxml_fromstring from lxml.objectify import fromstring as lxml_fromstring
from uuid import uuid4
from daemon_lib.common import getPrimaryNode from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection from daemon_lib.zkhandler import ZKConnection
@ -102,6 +103,19 @@ def route_task(name, args, kwargs, options, task=None, **kw):
app.config["CELERY_ROUTES"] = (route_task,) app.config["CELERY_ROUTES"] = (route_task,)
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
)
return task
# Set up SQLAlchemy backend # Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format( app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
@ -2441,7 +2455,7 @@ class API_VM_Locks(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = vm_flush_locks.delay(domain=vm, run_on=vm_node) task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
return ( return (
{"task_id": task.id, "run_on": vm_node}, {"task_id": task.id, "run_on": vm_node},
@ -2576,7 +2590,7 @@ class API_VM_Device(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = vm_device_attach.delay(domain=vm, xml=xml, run_on=vm_node) task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
return ( return (
{"task_id": task.id, "run_on": vm_node}, {"task_id": task.id, "run_on": vm_node},
@ -2630,7 +2644,7 @@ class API_VM_Device(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = vm_device_detach.delay(domain=vm, xml=xml, run_on=vm_node) task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
return ( return (
{"task_id": task.id, "run_on": vm_node}, {"task_id": task.id, "run_on": vm_node},
@ -4386,7 +4400,9 @@ class API_Storage_Ceph_Benchmark(Resource):
"message": 'Pool "{}" is not valid.'.format(reqargs.get("pool")) "message": 'Pool "{}" is not valid.'.format(reqargs.get("pool"))
}, 400 }, 400
task = run_benchmark.delay(pool=reqargs.get("pool", None), run_on="primary") task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
)
return ( return (
{"task_id": task.id, "run_on": get_primary_node()}, {"task_id": task.id, "run_on": get_primary_node()},
202, 202,
@ -4506,7 +4522,9 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
""" """
node = reqargs.get("node", None) node = reqargs.get("node", None)
task = osd_add_db_vg.delay(device=reqargs.get("device", None), run_on=node) task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
)
return ( return (
{"task_id": task.id, "run_on": node}, {"task_id": task.id, "run_on": node},
@ -4703,7 +4721,8 @@ class API_Storage_Ceph_OSD_Root(Resource):
""" """
node = reqargs.get("node", None) node = reqargs.get("node", None)
task = osd_add.delay( task = run_celery_task(
osd_add,
device=reqargs.get("device", None), device=reqargs.get("device", None),
weight=reqargs.get("weight", None), weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None), ext_db_ratio=reqargs.get("ext_db_ratio", None),
@ -4821,7 +4840,8 @@ class API_Storage_Ceph_OSD_Element(Resource):
else: else:
return osd_node_detail, retcode return osd_node_detail, retcode
task = osd_replace.delay( task = run_celery_task(
osd_replace,
osd_id=osdid, osd_id=osdid,
new_device=reqargs.get("new_device"), new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None), old_device=reqargs.get("old_device", None),
@ -4878,7 +4898,8 @@ class API_Storage_Ceph_OSD_Element(Resource):
else: else:
return osd_node_detail, retcode return osd_node_detail, retcode
task = osd_refresh.delay( task = run_celery_task(
osd_refresh,
osd_id=osdid, osd_id=osdid,
device=reqargs.get("device", None), device=reqargs.get("device", None),
ext_db_flag=False, ext_db_flag=False,
@ -4948,7 +4969,8 @@ class API_Storage_Ceph_OSD_Element(Resource):
else: else:
return osd_node_detail, retcode return osd_node_detail, retcode
task = osd_remove.delay( task = run_celery_task(
osd_remove,
osd_id=osdid, osd_id=osdid,
force_flag=reqargs.get("force", False), force_flag=reqargs.get("force", False),
run_on=node, run_on=node,
@ -8476,7 +8498,8 @@ class API_Provisioner_Create_Root(Resource):
else: else:
start_vm = False start_vm = False
task = create_vm.delay( task = run_celery_task(
create_vm,
vm_name=reqargs.get("name", None), vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None), profile_name=reqargs.get("profile", None),
define_vm=define_vm, define_vm=define_vm,