From 2057859b9fc226dab285cb6450e4163e027c78a5 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sun, 12 Jan 2020 14:01:47 -0500 Subject: [PATCH] Add Celery task list output --- client-api/pvc-api.py | 35 ++++++ client-cli/cli_lib/provisioner.py | 193 +++++++++++++++++++++++++----- client-cli/pvc.py | 4 +- docs/manuals/swagger.json | 31 +++++ 4 files changed, 229 insertions(+), 34 deletions(-) diff --git a/client-api/pvc-api.py b/client-api/pvc-api.py index ab85ac44..4ed53a0d 100755 --- a/client-api/pvc-api.py +++ b/client-api/pvc-api.py @@ -35,6 +35,7 @@ from functools import wraps from flask_restful import Resource, Api, reqparse, abort from celery import Celery +from celery.task.control import inspect import api_lib.pvcapi_helper as api_helper import api_lib.pvcapi_provisioner as api_provisioner @@ -5439,6 +5440,40 @@ class API_Provisioner_Create_Root(Resource): return { "task_id": task.id }, 202, { 'Location': Api.url_for(api, API_Provisioner_Status_Element, task_id=task.id) } api.add_resource(API_Provisioner_Create_Root, '/provisioner/create') +# /provisioner/status +class API_Provisioner_Status_Root(Resource): + @Authenticator + def get(self): + """ + View status of provisioner Celery queue + --- + tags: + - provisioner + responses: + 200: + description: OK + schema: + type: object + properties: + active: + type: object + description: Celery app.control.inspect active tasks + reserved: + type: object + description: Celery app.control.inspect reserved tasks + scheduled: + type: object + description: Celery app.control.inspect scheduled tasks + """ + queue = celery.control.inspect(timeout=0.1) + response = { + 'scheduled': queue.scheduled(), + 'active': queue.active(), + 'reserved': queue.reserved() + } + return response +api.add_resource(API_Provisioner_Status_Root, '/provisioner/status') + # /provisioner/status/ class API_Provisioner_Status_Element(Resource): @Authenticator diff --git a/client-cli/cli_lib/provisioner.py b/client-cli/cli_lib/provisioner.py index ea2d81eb..4425e2f7 100644 --- a/client-cli/cli_lib/provisioner.py +++ b/client-cli/cli_lib/provisioner.py @@ -23,6 +23,7 @@ import time import re import subprocess +import ast import cli_lib.ansiprint as ansiprint from cli_lib.common import call_api @@ -453,47 +454,55 @@ def vm_create(config, name, profile, wait_flag, define_flag, start_flag): return retvalue, retdata -def task_status(config, task_id, is_watching=False): +def task_status(config, task_id=None, is_watching=False): """ - Get information about provisioner job {task_id} + Get information about provisioner job {task_id} or all tasks if None API endpoint: GET /api/v1/provisioner/status API arguments: API schema: {json_data_object} """ - response = call_api(config, 'get', '/provisioner/status/{task_id}'.format(task_id=task_id)) + if task_id is not None: + response = call_api(config, 'get', '/provisioner/status/{task_id}'.format(task_id=task_id)) + else: + response = call_api(config, 'get', '/provisioner/status') - if response.status_code == 200: + if task_id is not None: + if response.status_code == 200: + retvalue = True + respjson = response.json() + + if is_watching: + # Just return the raw JSON to the watching process instead of formatting it + return respjson + + job_state = respjson['state'] + if job_state == 'RUNNING': + retdata = 'Job state: RUNNING\nStage: {}/{}\nStatus: {}'.format( + respjson['current'], + respjson['total'], + respjson['status'] + ) + elif job_state == 'FAILED': + retdata = 'Job state: FAILED\nStatus: {}'.format( + respjson['status'] + ) + elif job_state == 'COMPLETED': + retdata = 'Job state: COMPLETED\nStatus: {}'.format( + respjson['status'] + ) + else: + retdata = 'Job state: {}\nStatus: {}'.format( + respjson['state'], + respjson['status'] + ) + else: + retvalue = False + retdata = response.json()['message'] + else: retvalue = True respjson = response.json() - - if is_watching: - # Just return the raw JSON to the watching process instead of formatting it - return respjson - - job_state = respjson['state'] - if job_state == 'RUNNING': - retdata = 'Job state: RUNNING\nStage: {}/{}\nStatus: {}'.format( - respjson['current'], - respjson['total'], - respjson['status'] - ) - elif job_state == 'FAILED': - retdata = 'Job state: FAILED\nStatus: {}'.format( - respjson['status'] - ) - elif job_state == 'COMPLETED': - retdata = 'Job state: COMPLETED\nStatus: {}'.format( - respjson['status'] - ) - else: - retdata = 'Job state: {}\nStatus: {}'.format( - respjson['state'], - respjson['status'] - ) - else: - retvalue = False - retdata = response.json()['message'] + retdata = format_list_task(respjson) return retvalue, retdata @@ -1142,3 +1151,123 @@ Data: {profile_userdata: <{profile_userdata_length}} \ ) return '\n'.join([profile_list_output_header] + profile_list_output) + +def format_list_task(task_data_raw): + # Format the Celery data into a more useful data structure + task_data = list() + for task_type in ['active', 'reserved', 'scheduled']: + type_data = task_data_raw[task_type] + if not type_data: + type_data = dict() + for task_host in type_data: + for task_job in task_data_raw[task_type][task_host]: + task = dict() + if task_type == 'reserved': + task['type'] = 'pending' + else: + task['type'] = task_type + task['worker'] = task_host + task['id'] = task_job.get('id') + task_args = ast.literal_eval(task_job.get('args')) + task['vm_name'] = task_args[0] + task['vm_profile'] = task_args[1] + task_kwargs = ast.literal_eval(task_job.get('kwargs')) + task['vm_define'] = str(bool(task_kwargs['define_vm'])) + task['vm_start'] = str(bool(task_kwargs['start_vm'])) + task_data.append(task) + + task_list_output = [] + + # Determine optimal column widths + task_id_length = 3 + task_type_length = 7 + task_vm_name_length = 5 + task_vm_profile_length = 8 + task_vm_define_length = 8 + task_vm_start_length = 7 + task_worker_length = 8 + + for task in task_data: + # task_id column + _task_id_length = len(str(task['id'])) + 1 + if _task_id_length > task_id_length: + task_id_length = _task_id_length + # task_type column + _task_type_length = len(str(task['type'])) + 1 + if _task_type_length > task_type_length: + task_type_length = _task_type_length + # task_vm_name column + _task_vm_name_length = len(str(task['vm_name'])) + 1 + if _task_vm_name_length > task_vm_name_length: + task_vm_name_length = _task_vm_name_length + # task_vm_profile column + _task_vm_profile_length = len(str(task['vm_profile'])) + 1 + if _task_vm_profile_length > task_vm_profile_length: + task_vm_profile_length = _task_vm_profile_length + # task_vm_define column + _task_vm_define_length = len(str(task['vm_define'])) + 1 + if _task_vm_define_length > task_vm_define_length: + task_vm_define_length = _task_vm_define_length + # task_vm_start column + _task_vm_start_length = len(str(task['vm_start'])) + 1 + if _task_vm_start_length > task_vm_start_length: + task_vm_start_length = _task_vm_start_length + # task_worker column + _task_worker_length = len(str(task['worker'])) + 1 + if _task_worker_length > task_worker_length: + task_worker_length = _task_worker_length + + # Format the string (header) + task_list_output_header = '{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \ +{task_worker: <{task_worker_length}} \ +VM: {task_vm_name: <{task_vm_name_length}} \ +{task_vm_profile: <{task_vm_profile_length}} \ +{task_vm_define: <{task_vm_define_length}} \ +{task_vm_start: <{task_vm_start_length}}{end_bold}'.format( + task_id_length=task_id_length, + task_type_length=task_type_length, + task_worker_length=task_worker_length, + task_vm_name_length=task_vm_name_length, + task_vm_profile_length=task_vm_profile_length, + task_vm_define_length=task_vm_define_length, + task_vm_start_length=task_vm_start_length, + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + task_id='ID', + task_type='Status', + task_worker='Worker', + task_vm_name='Name', + task_vm_profile='Profile', + task_vm_define='Define?', + task_vm_start='Start?' + ) + + # Format the string (elements) + for task in sorted(task_data, key=lambda i: i.get('type', None)): + task_list_output.append( + '{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \ +{task_worker: <{task_worker_length}} \ + {task_vm_name: <{task_vm_name_length}} \ +{task_vm_profile: <{task_vm_profile_length}} \ +{task_vm_define: <{task_vm_define_length}} \ +{task_vm_start: <{task_vm_start_length}}{end_bold}'.format( + task_id_length=task_id_length, + task_type_length=task_type_length, + task_worker_length=task_worker_length, + task_vm_name_length=task_vm_name_length, + task_vm_profile_length=task_vm_profile_length, + task_vm_define_length=task_vm_define_length, + task_vm_start_length=task_vm_start_length, + bold='', + end_bold='', + task_id=task['id'], + task_type=task['type'], + task_worker=task['worker'], + task_vm_name=task['vm_name'], + task_vm_profile=task['vm_profile'], + task_vm_define=task['vm_define'], + task_vm_start=task['vm_start'] + ) + ) + + return '\n'.join([task_list_output_header] + task_list_output) diff --git a/client-cli/pvc.py b/client-cli/pvc.py index 009fc5d0..a24b797c 100755 --- a/client-cli/pvc.py +++ b/client-cli/pvc.py @@ -3029,11 +3029,11 @@ def provisioner_create(name, profile, wait_flag, define_flag, start_flag): ############################################################################### @click.command(name='status', short_help='Show status of provisioner job.') @click.argument( - 'job' + 'job', required=False, default=None ) def provisioner_status(job): """ - Show status of provisioner job JOB. + Show status of provisioner job JOB or a list of jobs. """ retcode, retdata = pvc_provisioner.task_status(config, job) cleanup(retcode, retdata) diff --git a/docs/manuals/swagger.json b/docs/manuals/swagger.json index 6922dee5..f1b82b1f 100644 --- a/docs/manuals/swagger.json +++ b/docs/manuals/swagger.json @@ -2568,6 +2568,37 @@ ] } }, + "/api/v1/provisioner/status": { + "get": { + "description": "", + "responses": { + "200": { + "description": "OK", + "schema": { + "properties": { + "active": { + "description": "Celery app.control.inspect active tasks", + "type": "object" + }, + "reserved": { + "description": "Celery app.control.inspect reserved tasks", + "type": "object" + }, + "scheduled": { + "description": "Celery app.control.inspect scheduled tasks", + "type": "object" + } + }, + "type": "object" + } + } + }, + "summary": "View status of provisioner Celery queue", + "tags": [ + "provisioner" + ] + } + }, "/api/v1/provisioner/status/{task_id}": { "get": { "description": "",