Port remaining tasks to new task handler

Move the create_vm and run_benchmark tasks to use the new Celery
subsystem, handlers, and wait command. Remove the obsolete, dedicated
API endpoints.

Standardize the CLI client and move the repeated handler code into a
separate common function.
This commit is contained in:
2023-11-16 01:57:56 -05:00
parent aef38639cf
commit 484e6542c2
7 changed files with 67 additions and 296 deletions

View File

@ -198,9 +198,15 @@ def Authenticator(function):
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True)
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[]
self,
vm_name,
profile_name,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
@ -212,8 +218,8 @@ def create_vm(
)
@celery.task(name="storage.benchmark", bind=True)
def run_benchmark(self, pool):
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@ -4373,11 +4379,11 @@ class API_Storage_Ceph_Benchmark(Resource):
"message": 'Pool "{}" is not valid.'.format(reqargs.get("pool"))
}, 400
task = run_benchmark.delay(reqargs.get("pool", None))
task = run_benchmark.delay(reqargs.get("pool", None), run_on="primary")
return (
{"task_id": task.id},
202,
{"Location": Api.url_for(api, API_Storage_Ceph_Benchmark, task_id=task.id)},
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -8468,116 +8474,13 @@ class API_Provisioner_Create_Root(Resource):
define_vm=define_vm,
start_vm=start_vm,
script_run_args=reqargs.get("arg", []),
run_on="primary",
)
return (
{"task_id": task.id},
202,
{
"Location": Api.url_for(
api, API_Provisioner_Status_Element, task_id=task.id
)
},
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_Provisioner_Create_Root, "/provisioner/create")
# /provisioner/status
class API_Provisioner_Status_Root(Resource):
@Authenticator
def get(self):
"""
View status of provisioner Celery queue
---
tags:
- provisioner
responses:
200:
description: OK
schema:
type: object
properties:
active:
type: object
description: Celery app.control.inspect active tasks
reserved:
type: object
description: Celery app.control.inspect reserved tasks
scheduled:
type: object
description: Celery app.control.inspect scheduled tasks
"""
queue = celery.control.inspect()
response = {
"scheduled": queue.scheduled(),
"active": queue.active(),
"reserved": queue.reserved(),
}
return response
api.add_resource(API_Provisioner_Status_Root, "/provisioner/status")
# /provisioner/status/<task_id>
class API_Provisioner_Status_Element(Resource):
@Authenticator
def get(self, task_id):
"""
View status of a provisioner Celery worker job {task_id}
---
tags:
- provisioner
responses:
200:
description: OK
schema:
type: object
properties:
total:
type: integer
description: Total number of steps
current:
type: integer
description: Current steps completed
state:
type: string
description: Current job state
status:
type: string
description: Status details about job
404:
description: Not found
schema:
type: object
id: Message
"""
task = create_vm.AsyncResult(task_id)
if task.state == "PENDING":
response = {
"state": task.state,
"current": 0,
"total": 1,
"status": "Pending job start",
}
elif task.state != "FAILURE":
response = {
"state": task.state,
"current": task.info.get("current", 0),
"total": task.info.get("total", 1),
"status": task.info.get("status", ""),
}
if "result" in task.info:
response["result"] = task.info["result"]
else:
response = {
"state": task.state,
"current": 1,
"total": 1,
"status": str(task.info),
}
return response
api.add_resource(API_Provisioner_Status_Element, "/provisioner/status/<task_id>")