From 484e6542c222f67463b46ddf918a86c666f1cafb Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 16 Nov 2023 01:57:56 -0500 Subject: [PATCH] Port remaining tasks to new task handler Move the create_vm and run_benchmark tasks to use the new Celery subsystem, handlers, and wait command. Remove the obsolete, dedicated API endpoints. Standardize the CLI client and move the repeated handler code into a separate common function. --- api-daemon/pvcapid/flaskapi.py | 125 ++++-------------------------- client-cli/pvc/cli/cli.py | 35 +++++---- client-cli/pvc/cli/waiters.py | 57 -------------- client-cli/pvc/lib/common.py | 18 +++++ client-cli/pvc/lib/provisioner.py | 17 +--- client-cli/pvc/lib/storage.py | 93 ++-------------------- client-cli/pvc/lib/vm.py | 18 +---- 7 files changed, 67 insertions(+), 296 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index b828ca70..f9ca4728 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -198,9 +198,15 @@ def Authenticator(function): # # Job functions # -@celery.task(name="provisioner.create", bind=True) +@celery.task(name="provisioner.create", bind=True, routing_key="run_on") def create_vm( - self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[] + self, + vm_name, + profile_name, + define_vm=True, + start_vm=True, + script_run_args=[], + run_on="primary", ): return api_vmbuilder.create_vm( self, @@ -212,8 +218,8 @@ def create_vm( ) -@celery.task(name="storage.benchmark", bind=True) -def run_benchmark(self, pool): +@celery.task(name="storage.benchmark", bind=True, routing_key="run_on") +def run_benchmark(self, pool, run_on="primary"): return api_benchmark.run_benchmark(self, pool) @@ -4373,11 +4379,11 @@ class API_Storage_Ceph_Benchmark(Resource): "message": 'Pool "{}" is not valid.'.format(reqargs.get("pool")) }, 400 - task = run_benchmark.delay(reqargs.get("pool", None)) + task = run_benchmark.delay(reqargs.get("pool", None), run_on="primary") return ( {"task_id": task.id}, 202, - {"Location": Api.url_for(api, API_Storage_Ceph_Benchmark, task_id=task.id)}, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) @@ -8468,116 +8474,13 @@ class API_Provisioner_Create_Root(Resource): define_vm=define_vm, start_vm=start_vm, script_run_args=reqargs.get("arg", []), + run_on="primary", ) return ( {"task_id": task.id}, 202, - { - "Location": Api.url_for( - api, API_Provisioner_Status_Element, task_id=task.id - ) - }, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) api.add_resource(API_Provisioner_Create_Root, "/provisioner/create") - - -# /provisioner/status -class API_Provisioner_Status_Root(Resource): - @Authenticator - def get(self): - """ - View status of provisioner Celery queue - --- - tags: - - provisioner - responses: - 200: - description: OK - schema: - type: object - properties: - active: - type: object - description: Celery app.control.inspect active tasks - reserved: - type: object - description: Celery app.control.inspect reserved tasks - scheduled: - type: object - description: Celery app.control.inspect scheduled tasks - """ - queue = celery.control.inspect() - response = { - "scheduled": queue.scheduled(), - "active": queue.active(), - "reserved": queue.reserved(), - } - return response - - -api.add_resource(API_Provisioner_Status_Root, "/provisioner/status") - - -# /provisioner/status/ -class API_Provisioner_Status_Element(Resource): - @Authenticator - def get(self, task_id): - """ - View status of a provisioner Celery worker job {task_id} - --- - tags: - - provisioner - responses: - 200: - description: OK - schema: - type: object - properties: - total: - type: integer - description: Total number of steps - current: - type: integer - description: Current steps completed - state: - type: string - description: Current job state - status: - type: string - description: Status details about job - 404: - description: Not found - schema: - type: object - id: Message - """ - task = create_vm.AsyncResult(task_id) - if task.state == "PENDING": - response = { - "state": task.state, - "current": 0, - "total": 1, - "status": "Pending job start", - } - elif task.state != "FAILURE": - response = { - "state": task.state, - "current": task.info.get("current", 0), - "total": task.info.get("total", 1), - "status": task.info.get("status", ""), - } - if "result" in task.info: - response["result"] = task.info["result"] - else: - response = { - "state": task.state, - "current": 1, - "total": 1, - "status": str(task.info), - } - return response - - -api.add_resource(API_Provisioner_Status_Element, "/provisioner/status/") diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index dd7a4d5e..cc31ebe2 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -3296,15 +3296,26 @@ def cli_storage_benchmark(): @click.command(name="run", short_help="Run a storage benchmark.") @connection_req @click.argument("pool") +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt( "Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue" ) -def cli_storage_benchmark_run(pool): +def cli_storage_benchmark_run(pool, wait_flag): """ Run a storage benchmark on POOL in the background. """ - retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool) + retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag) + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -5581,15 +5592,15 @@ def cli_provisioner_profile_list(limit, format_function): help="Start the VM automatically upon completion of provisioning.", ) @click.option( - "-w", - "--wait", + "--wait/--no-wait", "wait_flag", is_flag=True, - default=False, - help="Wait for provisioning to complete, showing progress", + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", ) def cli_provisioner_create( - name, profile, wait_flag, define_flag, start_flag, script_args + name, profile, define_flag, start_flag, script_args, wait_flag ): """ Create a new VM NAME with profile PROFILE. @@ -5610,15 +5621,13 @@ def cli_provisioner_create( if not define_flag: start_flag = False - retcode, retdata = pvc.lib.provisioner.vm_create( - CLI_CONFIG, name, profile, wait_flag, define_flag, start_flag, script_args + retcode, retmsg = pvc.lib.provisioner.vm_create( + CLI_CONFIG, name, profile, define_flag, start_flag, script_args, wait_flag ) if retcode and wait_flag: - task_id = retdata - retdata = wait_for_provisioner(CLI_CONFIG, task_id) - - finish(retcode, retdata) + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) + finish(retcode, retmsg) ############################################################################### diff --git a/client-cli/pvc/cli/waiters.py b/client-cli/pvc/cli/waiters.py index 93fefee4..70749b41 100644 --- a/client-cli/pvc/cli/waiters.py +++ b/client-cli/pvc/cli/waiters.py @@ -132,60 +132,3 @@ def wait_for_celery_task(CLI_CONFIG, task_detail): retdata = task_status.get("state") + ": " + task_status.get("status") return retdata - - -def wait_for_provisioner(CLI_CONFIG, task_id): - """ - Wait for a provisioner task to complete - """ - - echo(CLI_CONFIG, f"Task ID: {task_id}") - echo(CLI_CONFIG, "") - - # Wait for the task to start - echo(CLI_CONFIG, "Waiting for task to start...", newline=False) - while True: - sleep(1) - task_status = pvc.lib.provisioner.task_status( - CLI_CONFIG, task_id, is_watching=True - ) - if task_status.get("state") != "PENDING": - break - echo(CLI_CONFIG, ".", newline=False) - echo(CLI_CONFIG, " done.") - echo(CLI_CONFIG, "") - - # Start following the task state, updating progress as we go - total_task = task_status.get("total") - with progressbar(length=total_task, show_eta=False) as bar: - last_task = 0 - maxlen = 0 - while True: - sleep(1) - if task_status.get("state") != "RUNNING": - break - if task_status.get("current") > last_task: - current_task = int(task_status.get("current")) - bar.update(current_task - last_task) - last_task = current_task - # The extensive spaces at the end cause this to overwrite longer previous messages - curlen = len(str(task_status.get("status"))) - if curlen > maxlen: - maxlen = curlen - lendiff = maxlen - curlen - overwrite_whitespace = " " * lendiff - echo( - CLI_CONFIG, - " " + task_status.get("status") + overwrite_whitespace, - newline=False, - ) - task_status = pvc.lib.provisioner.task_status( - CLI_CONFIG, task_id, is_watching=True - ) - if task_status.get("state") == "SUCCESS": - bar.update(total_task - last_task) - - echo(CLI_CONFIG, "") - retdata = task_status.get("state") + ": " + task_status.get("status") - - return retdata diff --git a/client-cli/pvc/lib/common.py b/client-cli/pvc/lib/common.py index a25c4e61..2fd633dd 100644 --- a/client-cli/pvc/lib/common.py +++ b/client-cli/pvc/lib/common.py @@ -202,6 +202,24 @@ def call_api( return response +def get_wait_retdata(response, wait_flag): + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() + else: + retvalue = False + retdata = response.json().get("message", "") + + return retvalue, retdata + + def task_status(config, task_id=None, is_watching=False): """ Get information about Celery job {task_id}, or all tasks if None diff --git a/client-cli/pvc/lib/provisioner.py b/client-cli/pvc/lib/provisioner.py index cc183e22..e4b1b5e5 100644 --- a/client-cli/pvc/lib/provisioner.py +++ b/client-cli/pvc/lib/provisioner.py @@ -25,7 +25,7 @@ from requests_toolbelt.multipart.encoder import ( ) import pvc.lib.ansiprint as ansiprint -from pvc.lib.common import UploadProgressBar, call_api +from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata from ast import literal_eval @@ -700,7 +700,7 @@ def profile_remove(config, name): return retvalue, response.json().get("message", "") -def vm_create(config, name, profile, wait_flag, define_flag, start_flag, script_args): +def vm_create(config, name, profile, define_flag, start_flag, script_args, wait_flag): """ Create a new VM named {name} with profile {profile} @@ -717,18 +717,7 @@ def vm_create(config, name, profile, wait_flag, define_flag, start_flag, script_ } response = call_api(config, "post", "/provisioner/create", params=params) - if response.status_code == 202: - retvalue = True - if not wait_flag: - retdata = "Task ID: {}".format(response.json()["task_id"]) - else: - # Just return the task_id raw, instead of formatting it - retdata = response.json()["task_id"] - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def task_status(config, task_id=None, is_watching=False): diff --git a/client-cli/pvc/lib/storage.py b/client-cli/pvc/lib/storage.py index 3e1fa482..667d5525 100644 --- a/client-cli/pvc/lib/storage.py +++ b/client-cli/pvc/lib/storage.py @@ -29,7 +29,7 @@ from requests_toolbelt.multipart.encoder import ( ) import pvc.lib.ansiprint as ansiprint -from pvc.lib.common import UploadProgressBar, call_api +from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata # # Supplemental functions @@ -175,21 +175,7 @@ def ceph_osd_db_vg_add(config, node, device, wait_flag): params = {"node": node, "device": device} response = call_api(config, "post", "/storage/ceph/osddb", params=params) - if response.status_code == 202: - retvalue = True - retjson = response.json() - if not wait_flag: - retdata = ( - f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" - ) - else: - # Just return the task JSON without formatting - retdata = response.json() - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) # @@ -265,21 +251,7 @@ def ceph_osd_add( response = call_api(config, "post", "/storage/ceph/osd", params=params) - if response.status_code == 202: - retvalue = True - retjson = response.json() - if not wait_flag: - retdata = ( - f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" - ) - else: - # Just return the task JSON without formatting - retdata = response.json() - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def ceph_osd_replace( @@ -308,21 +280,7 @@ def ceph_osd_replace( response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params) - if response.status_code == 202: - retvalue = True - retjson = response.json() - if not wait_flag: - retdata = ( - f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" - ) - else: - # Just return the task JSON without formatting - retdata = response.json() - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def ceph_osd_refresh(config, osdid, device, wait_flag): @@ -338,21 +296,7 @@ def ceph_osd_refresh(config, osdid, device, wait_flag): } response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params) - if response.status_code == 202: - retvalue = True - retjson = response.json() - if not wait_flag: - retdata = ( - f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" - ) - else: - # Just return the task JSON without formatting - retdata = response.json() - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def ceph_osd_remove(config, osdid, force_flag, wait_flag): @@ -368,21 +312,7 @@ def ceph_osd_remove(config, osdid, force_flag, wait_flag): config, "delete", "/storage/ceph/osd/{osdid}".format(osdid=osdid), params=params ) - if response.status_code == 202: - retvalue = True - retjson = response.json() - if not wait_flag: - retdata = ( - f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" - ) - else: - # Just return the task JSON without formatting - retdata = response.json() - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def ceph_osd_state(config, osdid, state): @@ -1765,7 +1695,7 @@ def format_list_snapshot(config, snapshot_list): # # Benchmark functions # -def ceph_benchmark_run(config, pool): +def ceph_benchmark_run(config, pool, wait_flag): """ Run a storage benchmark against {pool} @@ -1776,14 +1706,7 @@ def ceph_benchmark_run(config, pool): params = {"pool": pool} response = call_api(config, "post", "/storage/ceph/benchmark", params=params) - if response.status_code == 202: - retvalue = True - retdata = "Task ID: {}".format(response.json()["task_id"]) - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def ceph_benchmark_list(config, job): diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index 4fe21a49..b2690cd9 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -23,7 +23,7 @@ import time import re import pvc.lib.ansiprint as ansiprint -from pvc.lib.common import call_api, format_bytes, format_metric +from pvc.lib.common import call_api, format_bytes, format_metric, get_wait_retdata # @@ -425,21 +425,7 @@ def vm_locks(config, vm, wait_flag): """ response = call_api(config, "post", f"/vm/{vm}/locks") - if response.status_code == 202: - retvalue = True - retjson = response.json() - if not wait_flag: - retdata = ( - f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" - ) - else: - # Just return the task JSON without formatting - retdata = response.json() - else: - retvalue = False - retdata = response.json().get("message", "") - - return retvalue, retdata + return get_wait_retdata(response, wait_flag) def vm_backup(config, vm, backup_path, incremental_parent=None, retain_snapshot=False):