Add Celery task list output

This commit is contained in:
Joshua Boniface 2020-01-12 14:01:47 -05:00
parent ed84df5237
commit 2057859b9f
4 changed files with 229 additions and 34 deletions

View File

@ -35,6 +35,7 @@ from functools import wraps
from flask_restful import Resource, Api, reqparse, abort
from celery import Celery
from celery.task.control import inspect
import api_lib.pvcapi_helper as api_helper
import api_lib.pvcapi_provisioner as api_provisioner
@ -5439,6 +5440,40 @@ class API_Provisioner_Create_Root(Resource):
return { "task_id": task.id }, 202, { 'Location': Api.url_for(api, API_Provisioner_Status_Element, task_id=task.id) }
api.add_resource(API_Provisioner_Create_Root, '/provisioner/create')
# /provisioner/status
class API_Provisioner_Status_Root(Resource):
@Authenticator
def get(self):
"""
View status of provisioner Celery queue
---
tags:
- provisioner
responses:
200:
description: OK
schema:
type: object
properties:
active:
type: object
description: Celery app.control.inspect active tasks
reserved:
type: object
description: Celery app.control.inspect reserved tasks
scheduled:
type: object
description: Celery app.control.inspect scheduled tasks
"""
queue = celery.control.inspect(timeout=0.1)
response = {
'scheduled': queue.scheduled(),
'active': queue.active(),
'reserved': queue.reserved()
}
return response
api.add_resource(API_Provisioner_Status_Root, '/provisioner/status')
# /provisioner/status/<task_id>
class API_Provisioner_Status_Element(Resource):
@Authenticator

View File

@ -23,6 +23,7 @@
import time
import re
import subprocess
import ast
import cli_lib.ansiprint as ansiprint
from cli_lib.common import call_api
@ -453,47 +454,55 @@ def vm_create(config, name, profile, wait_flag, define_flag, start_flag):
return retvalue, retdata
def task_status(config, task_id, is_watching=False):
def task_status(config, task_id=None, is_watching=False):
"""
Get information about provisioner job {task_id}
Get information about provisioner job {task_id} or all tasks if None
API endpoint: GET /api/v1/provisioner/status
API arguments:
API schema: {json_data_object}
"""
response = call_api(config, 'get', '/provisioner/status/{task_id}'.format(task_id=task_id))
if task_id is not None:
response = call_api(config, 'get', '/provisioner/status/{task_id}'.format(task_id=task_id))
else:
response = call_api(config, 'get', '/provisioner/status')
if response.status_code == 200:
if task_id is not None:
if response.status_code == 200:
retvalue = True
respjson = response.json()
if is_watching:
# Just return the raw JSON to the watching process instead of formatting it
return respjson
job_state = respjson['state']
if job_state == 'RUNNING':
retdata = 'Job state: RUNNING\nStage: {}/{}\nStatus: {}'.format(
respjson['current'],
respjson['total'],
respjson['status']
)
elif job_state == 'FAILED':
retdata = 'Job state: FAILED\nStatus: {}'.format(
respjson['status']
)
elif job_state == 'COMPLETED':
retdata = 'Job state: COMPLETED\nStatus: {}'.format(
respjson['status']
)
else:
retdata = 'Job state: {}\nStatus: {}'.format(
respjson['state'],
respjson['status']
)
else:
retvalue = False
retdata = response.json()['message']
else:
retvalue = True
respjson = response.json()
if is_watching:
# Just return the raw JSON to the watching process instead of formatting it
return respjson
job_state = respjson['state']
if job_state == 'RUNNING':
retdata = 'Job state: RUNNING\nStage: {}/{}\nStatus: {}'.format(
respjson['current'],
respjson['total'],
respjson['status']
)
elif job_state == 'FAILED':
retdata = 'Job state: FAILED\nStatus: {}'.format(
respjson['status']
)
elif job_state == 'COMPLETED':
retdata = 'Job state: COMPLETED\nStatus: {}'.format(
respjson['status']
)
else:
retdata = 'Job state: {}\nStatus: {}'.format(
respjson['state'],
respjson['status']
)
else:
retvalue = False
retdata = response.json()['message']
retdata = format_list_task(respjson)
return retvalue, retdata
@ -1142,3 +1151,123 @@ Data: {profile_userdata: <{profile_userdata_length}} \
)
return '\n'.join([profile_list_output_header] + profile_list_output)
def format_list_task(task_data_raw):
# Format the Celery data into a more useful data structure
task_data = list()
for task_type in ['active', 'reserved', 'scheduled']:
type_data = task_data_raw[task_type]
if not type_data:
type_data = dict()
for task_host in type_data:
for task_job in task_data_raw[task_type][task_host]:
task = dict()
if task_type == 'reserved':
task['type'] = 'pending'
else:
task['type'] = task_type
task['worker'] = task_host
task['id'] = task_job.get('id')
task_args = ast.literal_eval(task_job.get('args'))
task['vm_name'] = task_args[0]
task['vm_profile'] = task_args[1]
task_kwargs = ast.literal_eval(task_job.get('kwargs'))
task['vm_define'] = str(bool(task_kwargs['define_vm']))
task['vm_start'] = str(bool(task_kwargs['start_vm']))
task_data.append(task)
task_list_output = []
# Determine optimal column widths
task_id_length = 3
task_type_length = 7
task_vm_name_length = 5
task_vm_profile_length = 8
task_vm_define_length = 8
task_vm_start_length = 7
task_worker_length = 8
for task in task_data:
# task_id column
_task_id_length = len(str(task['id'])) + 1
if _task_id_length > task_id_length:
task_id_length = _task_id_length
# task_type column
_task_type_length = len(str(task['type'])) + 1
if _task_type_length > task_type_length:
task_type_length = _task_type_length
# task_vm_name column
_task_vm_name_length = len(str(task['vm_name'])) + 1
if _task_vm_name_length > task_vm_name_length:
task_vm_name_length = _task_vm_name_length
# task_vm_profile column
_task_vm_profile_length = len(str(task['vm_profile'])) + 1
if _task_vm_profile_length > task_vm_profile_length:
task_vm_profile_length = _task_vm_profile_length
# task_vm_define column
_task_vm_define_length = len(str(task['vm_define'])) + 1
if _task_vm_define_length > task_vm_define_length:
task_vm_define_length = _task_vm_define_length
# task_vm_start column
_task_vm_start_length = len(str(task['vm_start'])) + 1
if _task_vm_start_length > task_vm_start_length:
task_vm_start_length = _task_vm_start_length
# task_worker column
_task_worker_length = len(str(task['worker'])) + 1
if _task_worker_length > task_worker_length:
task_worker_length = _task_worker_length
# Format the string (header)
task_list_output_header = '{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
VM: {task_vm_name: <{task_vm_name_length}} \
{task_vm_profile: <{task_vm_profile_length}} \
{task_vm_define: <{task_vm_define_length}} \
{task_vm_start: <{task_vm_start_length}}{end_bold}'.format(
task_id_length=task_id_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_vm_name_length=task_vm_name_length,
task_vm_profile_length=task_vm_profile_length,
task_vm_define_length=task_vm_define_length,
task_vm_start_length=task_vm_start_length,
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
task_id='ID',
task_type='Status',
task_worker='Worker',
task_vm_name='Name',
task_vm_profile='Profile',
task_vm_define='Define?',
task_vm_start='Start?'
)
# Format the string (elements)
for task in sorted(task_data, key=lambda i: i.get('type', None)):
task_list_output.append(
'{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
{task_vm_name: <{task_vm_name_length}} \
{task_vm_profile: <{task_vm_profile_length}} \
{task_vm_define: <{task_vm_define_length}} \
{task_vm_start: <{task_vm_start_length}}{end_bold}'.format(
task_id_length=task_id_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_vm_name_length=task_vm_name_length,
task_vm_profile_length=task_vm_profile_length,
task_vm_define_length=task_vm_define_length,
task_vm_start_length=task_vm_start_length,
bold='',
end_bold='',
task_id=task['id'],
task_type=task['type'],
task_worker=task['worker'],
task_vm_name=task['vm_name'],
task_vm_profile=task['vm_profile'],
task_vm_define=task['vm_define'],
task_vm_start=task['vm_start']
)
)
return '\n'.join([task_list_output_header] + task_list_output)

View File

@ -3029,11 +3029,11 @@ def provisioner_create(name, profile, wait_flag, define_flag, start_flag):
###############################################################################
@click.command(name='status', short_help='Show status of provisioner job.')
@click.argument(
'job'
'job', required=False, default=None
)
def provisioner_status(job):
"""
Show status of provisioner job JOB.
Show status of provisioner job JOB or a list of jobs.
"""
retcode, retdata = pvc_provisioner.task_status(config, job)
cleanup(retcode, retdata)

View File

@ -2568,6 +2568,37 @@
]
}
},
"/api/v1/provisioner/status": {
"get": {
"description": "",
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"active": {
"description": "Celery app.control.inspect active tasks",
"type": "object"
},
"reserved": {
"description": "Celery app.control.inspect reserved tasks",
"type": "object"
},
"scheduled": {
"description": "Celery app.control.inspect scheduled tasks",
"type": "object"
}
},
"type": "object"
}
}
},
"summary": "View status of provisioner Celery queue",
"tags": [
"provisioner"
]
}
},
"/api/v1/provisioner/status/{task_id}": {
"get": {
"description": "",