Port VM on-node tasks to Celery worker system

Adds Celery versions of the flush_locks, device_attach, and
device_detach functions.
This commit is contained in:
2023-11-05 22:32:41 -05:00
parent f0c2e9d295
commit 89681d54b9
9 changed files with 577 additions and 174 deletions

View File

@ -25,10 +25,16 @@ from functools import wraps
from flask_restful import Resource, Api, reqparse, abort
from celery import Celery
from kombu import Queue
from lxml.objectify import fromstring as lxml_fromstring
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.node import get_list as get_node_list
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from pvcapid.Daemon import config, strtobool, API_VERSION
@ -50,10 +56,8 @@ app.config["CELERY_RESULT_BACKEND"] = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
)
# Set up Celery queues
app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True}
@ZKConnection(config)
def get_all_nodes(zkhandler):
_, all_nodes = get_node_list(zkhandler, None)
@ -206,6 +210,33 @@ def run_benchmark(self, pool):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain, xml, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain, xml, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
##########################################################
# API Root/Authentication
##########################################################
@ -629,6 +660,106 @@ class API_Status(Resource):
api.add_resource(API_Status, "/status")
# /tasks
class API_Tasks(Resource):
@Authenticator
def get(self):
"""
Return a list of active Celery worker tasks
---
tags:
- root
responses:
200:
description: OK
schema:
type: object
properties:
active:
type: object
description: Celery app.control.inspect active tasks
reserved:
type: object
description: Celery app.control.inspect reserved tasks
scheduled:
type: object
description: Celery app.control.inspect scheduled tasks
"""
queue = celery.control.inspect()
response = {
"scheduled": queue.scheduled(),
"active": queue.active(),
"reserved": queue.reserved(),
}
return response
api.add_resource(API_Tasks, "/tasks")
# /tasks/<task_id>
class API_Tasks_Element(Resource):
@Authenticator
def get(self, task_id):
"""
View status of a Celery worker task {task_id}
---
tags:
- provisioner
responses:
200:
description: OK
schema:
type: object
properties:
total:
type: integer
description: Total number of steps
current:
type: integer
description: Current steps completed
state:
type: string
description: Current job state
status:
type: string
description: Status details about job
404:
description: Not found
schema:
type: object
id: Message
"""
task = celery.AsyncResult(task_id)
if task.state == "PENDING":
response = {
"state": task.state,
"current": 0,
"total": 1,
"status": "Pending job start",
}
elif task.state != "FAILURE":
response = {
"state": task.state,
"current": task.info.get("current", 0),
"total": task.info.get("total", 1),
"status": task.info.get("status", ""),
}
if "result" in task.info:
response["result"] = task.info["result"]
else:
response = {
"state": task.state,
"current": 1,
"total": 1,
"status": str(task.info),
}
return response
api.add_resource(API_Tasks_Element, "/tasks/<task_id>")
##########################################################
# Client API - Node
##########################################################
@ -2168,18 +2299,25 @@ class API_VM_Locks(Resource):
tags:
- vm
responses:
200:
202:
description: OK
schema:
type: object
id: Message
400:
description: Bad request
schema:
type: object
id: Message
type: string
description: The Celery job ID of the task
"""
return api_helper.vm_flush_locks(vm)
vm_node_detail, retcode = api_helper.vm_node(vm)
if retcode == 200:
vm_node = vm_node_detail["node"]
else:
return vm_node_detail, retcode
task = vm_flush_locks.delay(vm, run_on=vm_node)
return (
{"task_id": task.id, "run_on": vm_node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_VM_Locks, "/vm/<vm>/locks")
@ -2296,7 +2434,25 @@ class API_VM_Device(Resource):
type: object
id: Message
"""
return api_helper.vm_attach_device(vm, reqargs.get("xml", None))
try:
xml = reqargs.get("xml", None)
lxml_fromstring(xml)
except Exception:
return {"message": "Specified XML document is not valid"}, 400
vm_node_detail, retcode = api_helper.vm_node(vm)
if retcode == 200:
vm_node = vm_node_detail["node"]
else:
return vm_node_detail, retcode
task = vm_device_attach.delay(vm, xml, run_on=vm_node)
return (
{"task_id": task.id, "run_on": vm_node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@RequestParser(
[
@ -2332,7 +2488,25 @@ class API_VM_Device(Resource):
type: object
id: Message
"""
return api_helper.vm_detach_device(vm, reqargs.get("xml", None))
try:
xml = reqargs.get("xml", None)
lxml_fromstring(xml)
except Exception:
return {"message": "Specified XML document is not valid"}, 400
vm_node_detail, retcode = api_helper.vm_node(vm)
if retcode == 200:
vm_node = vm_node_detail["node"]
else:
return vm_node_detail, retcode
task = vm_device_detach.delay(vm, xml, run_on=vm_node)
return (
{"task_id": task.id, "run_on": vm_node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_VM_Device, "/vm/<vm>/device")
@ -8161,7 +8335,7 @@ class API_Provisioner_Status_Root(Resource):
type: object
description: Celery app.control.inspect scheduled tasks
"""
queue = celery.control.inspect(timeout=0.1)
queue = celery.control.inspect()
response = {
"scheduled": queue.scheduled(),
"active": queue.active(),

View File

@ -355,6 +355,9 @@ def vm_node(zkhandler, vm):
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
)
if len(retdata) > 0:
retdata = retdata[0]
if retflag:
if retdata:
retcode = 200