Port all Celery worker functions to discrete pkg
Moves all tasks run by the Celery worker into a discrete package/module for easier installation. Also adjusts several parameters throughout to accomplish this.
This commit is contained in:
@ -1,433 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# benchmark.py - PVC API Benchmark functions
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
|
||||
from datetime import datetime
|
||||
from json import loads, dumps
|
||||
|
||||
from pvcapid.Daemon import config
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
from daemon_lib.celery import start, fail, log_info, update, finish
|
||||
|
||||
import daemon_lib.common as pvc_common
|
||||
import daemon_lib.ceph as pvc_ceph
|
||||
|
||||
|
||||
# Define the current test format
|
||||
TEST_FORMAT = 1
|
||||
|
||||
|
||||
# We run a total of 8 tests, to give a generalized idea of performance on the cluster:
|
||||
# 1. A sequential read test of 8GB with a 4M block size
|
||||
# 2. A sequential write test of 8GB with a 4M block size
|
||||
# 3. A random read test of 8GB with a 4M block size
|
||||
# 4. A random write test of 8GB with a 4M block size
|
||||
# 5. A random read test of 8GB with a 256k block size
|
||||
# 6. A random write test of 8GB with a 256k block size
|
||||
# 7. A random read test of 8GB with a 4k block size
|
||||
# 8. A random write test of 8GB with a 4k block size
|
||||
# Taken together, these 8 results should give a very good indication of the overall storage performance
|
||||
# for a variety of workloads.
|
||||
test_matrix = {
|
||||
"seq_read": {
|
||||
"direction": "read",
|
||||
"iodepth": "64",
|
||||
"bs": "4M",
|
||||
"rw": "read",
|
||||
},
|
||||
"seq_write": {
|
||||
"direction": "write",
|
||||
"iodepth": "64",
|
||||
"bs": "4M",
|
||||
"rw": "write",
|
||||
},
|
||||
"rand_read_4M": {
|
||||
"direction": "read",
|
||||
"iodepth": "64",
|
||||
"bs": "4M",
|
||||
"rw": "randread",
|
||||
},
|
||||
"rand_write_4M": {
|
||||
"direction": "write",
|
||||
"iodepth": "64",
|
||||
"bs": "4M",
|
||||
"rw": "randwrite",
|
||||
},
|
||||
"rand_read_4K": {
|
||||
"direction": "read",
|
||||
"iodepth": "64",
|
||||
"bs": "4K",
|
||||
"rw": "randread",
|
||||
},
|
||||
"rand_write_4K": {
|
||||
"direction": "write",
|
||||
"iodepth": "64",
|
||||
"bs": "4K",
|
||||
"rw": "randwrite",
|
||||
},
|
||||
"rand_read_4K_lowdepth": {
|
||||
"direction": "read",
|
||||
"iodepth": "1",
|
||||
"bs": "4K",
|
||||
"rw": "randread",
|
||||
},
|
||||
"rand_write_4K_lowdepth": {
|
||||
"direction": "write",
|
||||
"iodepth": "1",
|
||||
"bs": "4K",
|
||||
"rw": "randwrite",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# Specify the benchmark volume name and size
|
||||
benchmark_volume_name = "pvcbenchmark"
|
||||
benchmark_volume_size = "8G"
|
||||
|
||||
|
||||
#
|
||||
# Exceptions (used by Celery tasks)
|
||||
#
|
||||
class BenchmarkError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
#
|
||||
# Common functions
|
||||
#
|
||||
|
||||
|
||||
def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
|
||||
if db_conn is not None and db_cur is not None:
|
||||
# Clean up our dangling result
|
||||
query = "DELETE FROM storage_benchmarks WHERE job = %s;"
|
||||
args = (job_name,)
|
||||
db_cur.execute(query, args)
|
||||
db_conn.commit()
|
||||
# Close the database connections cleanly
|
||||
close_database(db_conn, db_cur)
|
||||
if zkhandler is not None:
|
||||
zkhandler.disconnect()
|
||||
del zkhandler
|
||||
|
||||
|
||||
# Database connections
|
||||
def open_database(config):
|
||||
conn = psycopg2.connect(
|
||||
host=config["api_postgresql_host"],
|
||||
port=config["api_postgresql_port"],
|
||||
dbname=config["api_postgresql_name"],
|
||||
user=config["api_postgresql_user"],
|
||||
password=config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
return conn, cur
|
||||
|
||||
|
||||
def close_database(conn, cur, failed=False):
|
||||
if not failed:
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
def list_benchmarks(job=None):
|
||||
if job is not None:
|
||||
query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks")
|
||||
args = (job,)
|
||||
else:
|
||||
query = "SELECT * FROM {} ORDER BY id DESC;".format("storage_benchmarks")
|
||||
args = ()
|
||||
|
||||
conn, cur = open_database(config)
|
||||
cur.execute(query, args)
|
||||
orig_data = cur.fetchall()
|
||||
data = list()
|
||||
for benchmark in orig_data:
|
||||
benchmark_data = dict()
|
||||
benchmark_data["id"] = benchmark["id"]
|
||||
benchmark_data["job"] = benchmark["job"]
|
||||
benchmark_data["test_format"] = benchmark["test_format"]
|
||||
if benchmark["result"] == "Running":
|
||||
benchmark_data["benchmark_result"] = "Running"
|
||||
else:
|
||||
try:
|
||||
benchmark_data["benchmark_result"] = loads(benchmark["result"])
|
||||
except Exception:
|
||||
benchmark_data["benchmark_result"] = {}
|
||||
# Append the new data to our actual output structure
|
||||
data.append(benchmark_data)
|
||||
close_database(conn, cur)
|
||||
if data:
|
||||
return data, 200
|
||||
else:
|
||||
return {"message": "No benchmark found."}, 404
|
||||
|
||||
|
||||
def prepare_benchmark_volume(
|
||||
pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
|
||||
):
|
||||
# Create the RBD volume
|
||||
retcode, retmsg = pvc_ceph.add_volume(
|
||||
zkhandler, pool, benchmark_volume_name, benchmark_volume_size
|
||||
)
|
||||
if not retcode:
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
None,
|
||||
f'Failed to create volume "{benchmark_volume_name}" on pool "{pool}": {retmsg}',
|
||||
)
|
||||
else:
|
||||
log_info(None, retmsg)
|
||||
|
||||
|
||||
def cleanup_benchmark_volume(
|
||||
pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
|
||||
):
|
||||
# Remove the RBD volume
|
||||
retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, benchmark_volume_name)
|
||||
if not retcode:
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
None,
|
||||
f'Failed to remove volume "{benchmark_volume_name}" from pool "{pool}": {retmsg}',
|
||||
)
|
||||
else:
|
||||
log_info(None, retmsg)
|
||||
|
||||
|
||||
def run_benchmark_job(
|
||||
test, pool, job_name=None, db_conn=None, db_cur=None, zkhandler=None
|
||||
):
|
||||
test_spec = test_matrix[test]
|
||||
log_info(None, f"Running test '{test}'")
|
||||
fio_cmd = """
|
||||
fio \
|
||||
--name={test} \
|
||||
--ioengine=rbd \
|
||||
--pool={pool} \
|
||||
--rbdname={volume} \
|
||||
--output-format=json \
|
||||
--direct=1 \
|
||||
--randrepeat=1 \
|
||||
--numjobs=1 \
|
||||
--time_based \
|
||||
--runtime=75 \
|
||||
--group_reporting \
|
||||
--iodepth={iodepth} \
|
||||
--bs={bs} \
|
||||
--readwrite={rw}
|
||||
""".format(
|
||||
test=test,
|
||||
pool=pool,
|
||||
volume=benchmark_volume_name,
|
||||
iodepth=test_spec["iodepth"],
|
||||
bs=test_spec["bs"],
|
||||
rw=test_spec["rw"],
|
||||
)
|
||||
|
||||
log_info(None, "Running fio job: {}".format(" ".join(fio_cmd.split())))
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
|
||||
try:
|
||||
jstdout = loads(stdout)
|
||||
if retcode:
|
||||
raise
|
||||
except Exception:
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
None,
|
||||
f"Failed to run fio test '{test}': {stderr}",
|
||||
)
|
||||
|
||||
return jstdout
|
||||
|
||||
|
||||
def run_benchmark(self, pool):
|
||||
# Phase 0 - connect to databases
|
||||
try:
|
||||
zkhandler = ZKHandler(config)
|
||||
zkhandler.connect()
|
||||
except Exception:
|
||||
fail(
|
||||
self,
|
||||
"Failed to connect to Zookeeper",
|
||||
)
|
||||
|
||||
cur_time = datetime.now().isoformat(timespec="seconds")
|
||||
cur_primary = zkhandler.read("base.config.primary_node")
|
||||
job_name = f"{cur_time}_{cur_primary}"
|
||||
|
||||
current_stage = 0
|
||||
total_stages = 13
|
||||
start(
|
||||
self,
|
||||
f"Running storage benchmark '{job_name}' on pool '{pool}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
try:
|
||||
db_conn, db_cur = open_database(config)
|
||||
except Exception:
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=None,
|
||||
db_cur=None,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
self,
|
||||
"Failed to connect to Postgres",
|
||||
)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
"Storing running status in database",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
try:
|
||||
query = "INSERT INTO storage_benchmarks (job, test_format, result) VALUES (%s, %s, %s);"
|
||||
args = (
|
||||
job_name,
|
||||
TEST_FORMAT,
|
||||
"Running",
|
||||
)
|
||||
db_cur.execute(query, args)
|
||||
db_conn.commit()
|
||||
except Exception as e:
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
"Creating benchmark volume",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
prepare_benchmark_volume(
|
||||
pool,
|
||||
job_name=job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
|
||||
# Phase 2 - benchmark run
|
||||
results = dict()
|
||||
for test in test_matrix:
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
f"Running benchmark job '{test}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
results[test] = run_benchmark_job(
|
||||
test,
|
||||
pool,
|
||||
job_name=job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
|
||||
# Phase 3 - cleanup
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
"Cleaning up venchmark volume",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
cleanup_benchmark_volume(
|
||||
pool,
|
||||
job_name=job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
"Storing results in database",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
try:
|
||||
query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;"
|
||||
args = (dumps(results), job_name)
|
||||
db_cur.execute(query, args)
|
||||
db_conn.commit()
|
||||
except Exception as e:
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError)
|
||||
|
||||
cleanup(
|
||||
job_name,
|
||||
db_conn=db_conn,
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
self,
|
||||
f"Storage benchmark {job_name} completed successfully",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
@ -31,25 +31,12 @@ from uuid import uuid4
|
||||
from daemon_lib.common import getPrimaryNode
|
||||
from daemon_lib.zkhandler import ZKConnection
|
||||
from daemon_lib.node import get_list as get_node_list
|
||||
from daemon_lib.vm import (
|
||||
vm_worker_flush_locks,
|
||||
vm_worker_attach_device,
|
||||
vm_worker_detach_device,
|
||||
)
|
||||
from daemon_lib.ceph import (
|
||||
osd_worker_add_osd,
|
||||
osd_worker_replace_osd,
|
||||
osd_worker_refresh_osd,
|
||||
osd_worker_remove_osd,
|
||||
osd_worker_add_db_vg,
|
||||
)
|
||||
from daemon_lib.benchmark import list_benchmarks
|
||||
|
||||
from pvcapid.Daemon import config, strtobool, API_VERSION
|
||||
|
||||
import pvcapid.helper as api_helper
|
||||
import pvcapid.provisioner as api_provisioner
|
||||
import pvcapid.vmbuilder as api_vmbuilder
|
||||
import pvcapid.benchmark as api_benchmark
|
||||
import pvcapid.ova as api_ova
|
||||
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
@ -4191,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource):
|
||||
type: string (integer)
|
||||
description: The number of minor page faults during the test
|
||||
"""
|
||||
return api_benchmark.list_benchmarks(reqargs.get("job", None))
|
||||
return list_benchmarks(config, reqargs.get("job", None))
|
||||
|
||||
@RequestParser(
|
||||
[
|
||||
|
@ -1,851 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# vmbuilder.py - pvc api vm builder (provisioner) functions
|
||||
# part of the parallel virtual cluster (pvc) system
|
||||
#
|
||||
# copyright (c) 2018-2022 joshua m. boniface <joshua@boniface.me>
|
||||
#
|
||||
# this program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the gnu general public license as published by
|
||||
# the free software foundation, version 3.
|
||||
#
|
||||
# this program is distributed in the hope that it will be useful,
|
||||
# but without any warranty; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import json
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import re
|
||||
import os
|
||||
|
||||
# import sys
|
||||
import importlib.util
|
||||
import uuid
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
from pvcapid.Daemon import config
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
|
||||
|
||||
import daemon_lib.common as pvc_common
|
||||
import daemon_lib.node as pvc_node
|
||||
import daemon_lib.vm as pvc_vm
|
||||
import daemon_lib.network as pvc_network
|
||||
import daemon_lib.ceph as pvc_ceph
|
||||
|
||||
|
||||
#
|
||||
# Exceptions (used by Celery tasks)
|
||||
#
|
||||
class ValidationError(Exception):
|
||||
"""
|
||||
An exception that results from some value being un- or mis-defined.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class ClusterError(Exception):
|
||||
"""
|
||||
An exception that results from the PVC cluster being out of alignment with the action.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class ProvisioningError(Exception):
|
||||
"""
|
||||
An exception that results from a failure of a provisioning command.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
#
|
||||
# VMBuilder class - subclassed by install scripts
|
||||
#
|
||||
class VMBuilder(object):
|
||||
def __init__(
|
||||
self,
|
||||
vm_name,
|
||||
vm_id,
|
||||
vm_profile,
|
||||
vm_data,
|
||||
):
|
||||
self.vm_name = vm_name
|
||||
self.vm_id = vm_id
|
||||
self.vm_uuid = uuid.uuid4()
|
||||
self.vm_profile = vm_profile
|
||||
self.vm_data = vm_data
|
||||
|
||||
#
|
||||
# Helper class functions; used by the individual scripts
|
||||
#
|
||||
def log_info(self, msg):
|
||||
log_info(None, msg)
|
||||
|
||||
def log_warn(self, msg):
|
||||
log_warn(None, msg)
|
||||
|
||||
def log_err(self, msg):
|
||||
log_err(None, msg)
|
||||
|
||||
def fail(self, msg, exception=ProvisioningError):
|
||||
raise exception(msg)
|
||||
|
||||
#
|
||||
# Primary class functions; implemented by the individual scripts
|
||||
#
|
||||
def setup(self):
|
||||
"""
|
||||
setup(): Perform special setup steps before proceeding
|
||||
OPTIONAL
|
||||
"""
|
||||
pass
|
||||
|
||||
def create(self):
|
||||
"""
|
||||
create(): Create the VM libvirt schema definition which is defined afterwards
|
||||
"""
|
||||
pass
|
||||
|
||||
def prepare(self):
|
||||
"""
|
||||
prepare(): Prepare any disks/volumes for the install step
|
||||
"""
|
||||
pass
|
||||
|
||||
def install(self):
|
||||
"""
|
||||
install(): Perform the installation
|
||||
"""
|
||||
pass
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
cleanup(): Perform any cleanup required after the prepare() step or on failure of the install() step
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
#
|
||||
# Helper functions (as context managers)
|
||||
#
|
||||
@contextmanager
|
||||
def chroot(destination):
|
||||
"""
|
||||
Change root directory to a given destination
|
||||
"""
|
||||
try:
|
||||
real_root = os.open("/", os.O_RDONLY)
|
||||
os.chroot(destination)
|
||||
fake_root = os.open("/", os.O_RDONLY)
|
||||
os.fchdir(fake_root)
|
||||
yield
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
os.fchdir(real_root)
|
||||
os.chroot(".")
|
||||
os.fchdir(real_root)
|
||||
os.close(fake_root)
|
||||
os.close(real_root)
|
||||
del fake_root
|
||||
del real_root
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_db(config):
|
||||
try:
|
||||
conn = psycopg2.connect(
|
||||
host=config["api_postgresql_host"],
|
||||
port=config["api_postgresql_port"],
|
||||
dbname=config["api_postgresql_name"],
|
||||
user=config["api_postgresql_user"],
|
||||
password=config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
except Exception:
|
||||
fail(
|
||||
None,
|
||||
"Failed to connect to Postgres",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
try:
|
||||
yield cur
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
del conn
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_zk(config):
|
||||
try:
|
||||
zkhandler = ZKHandler(config)
|
||||
zkhandler.connect()
|
||||
except Exception:
|
||||
fail(
|
||||
None,
|
||||
"Failed to connect to Zookeeper",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
try:
|
||||
yield zkhandler
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
zkhandler.disconnect()
|
||||
del zkhandler
|
||||
|
||||
|
||||
#
|
||||
# Main VM provisioning function - executed by the Celery worker
|
||||
#
|
||||
def create_vm(
|
||||
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
|
||||
):
|
||||
current_stage = 0
|
||||
total_stages = 11
|
||||
start(
|
||||
celery,
|
||||
f"Provisioning new VM '{vm_name}' with profile '{vm_profile}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Phase 1 - setup
|
||||
# * Get the profile elements
|
||||
# * Get the details from these elements
|
||||
# * Assemble a VM configuration dictionary
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Collecting configuration details",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
vm_id = re.findall(r"/(\d+)$/", vm_name)
|
||||
if not vm_id:
|
||||
vm_id = 0
|
||||
else:
|
||||
vm_id = vm_id[0]
|
||||
|
||||
vm_data = dict()
|
||||
|
||||
with open_db(config) as db_cur:
|
||||
# Get the profile information
|
||||
query = "SELECT * FROM profile WHERE name = %s"
|
||||
args = (vm_profile,)
|
||||
db_cur.execute(query, args)
|
||||
profile_data = db_cur.fetchone()
|
||||
if profile_data.get("arguments"):
|
||||
vm_data["script_arguments"] = profile_data.get("arguments").split("|")
|
||||
else:
|
||||
vm_data["script_arguments"] = []
|
||||
|
||||
# Get the system details
|
||||
query = "SELECT * FROM system_template WHERE id = %s"
|
||||
args = (profile_data["system_template"],)
|
||||
db_cur.execute(query, args)
|
||||
vm_data["system_details"] = db_cur.fetchone()
|
||||
|
||||
# Get the MAC template
|
||||
query = "SELECT mac_template FROM network_template WHERE id = %s"
|
||||
args = (profile_data["network_template"],)
|
||||
db_cur.execute(query, args)
|
||||
db_row = db_cur.fetchone()
|
||||
if db_row:
|
||||
vm_data["mac_template"] = db_row.get("mac_template")
|
||||
else:
|
||||
vm_data["mac_template"] = None
|
||||
|
||||
# Get the networks
|
||||
query = "SELECT * FROM network WHERE network_template = %s"
|
||||
args = (profile_data["network_template"],)
|
||||
db_cur.execute(query, args)
|
||||
_vm_networks = db_cur.fetchall()
|
||||
vm_networks = list()
|
||||
|
||||
# Set the eth_bridge for each network
|
||||
for network in _vm_networks:
|
||||
vni = network["vni"]
|
||||
if vni in ["upstream", "cluster", "storage"]:
|
||||
eth_bridge = "br{}".format(vni)
|
||||
else:
|
||||
eth_bridge = "vmbr{}".format(vni)
|
||||
network["eth_bridge"] = eth_bridge
|
||||
vm_networks.append(network)
|
||||
vm_data["networks"] = vm_networks
|
||||
|
||||
# Get the storage volumes
|
||||
# ORDER BY ensures disks are always in the sdX/vdX order, regardless of add order
|
||||
query = "SELECT * FROM storage WHERE storage_template = %s ORDER BY disk_id"
|
||||
args = (profile_data["storage_template"],)
|
||||
db_cur.execute(query, args)
|
||||
vm_data["volumes"] = db_cur.fetchall()
|
||||
|
||||
# Get the script
|
||||
query = "SELECT script FROM script WHERE id = %s"
|
||||
args = (profile_data["script"],)
|
||||
db_cur.execute(query, args)
|
||||
db_row = db_cur.fetchone()
|
||||
if db_row:
|
||||
vm_data["script"] = db_row.get("script")
|
||||
else:
|
||||
vm_data["script"] = None
|
||||
|
||||
if profile_data.get("profile_type") == "ova":
|
||||
query = "SELECT * FROM ova WHERE id = %s"
|
||||
args = (profile_data["ova"],)
|
||||
db_cur.execute(query, args)
|
||||
vm_data["ova_details"] = db_cur.fetchone()
|
||||
|
||||
query = "SELECT * FROM ova_volume WHERE ova = %s"
|
||||
args = (profile_data["ova"],)
|
||||
db_cur.execute(query, args)
|
||||
# Replace the existing volumes list with our OVA volume list
|
||||
vm_data["volumes"] = db_cur.fetchall()
|
||||
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("uname -m")
|
||||
vm_data["system_architecture"] = stdout.strip()
|
||||
|
||||
monitor_list = list()
|
||||
monitor_names = config["storage_hosts"]
|
||||
for monitor in monitor_names:
|
||||
monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
|
||||
vm_data["ceph_monitor_list"] = monitor_list
|
||||
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
|
||||
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]
|
||||
|
||||
# Parse the script arguments
|
||||
script_arguments = dict()
|
||||
for argument in vm_data["script_arguments"]:
|
||||
argument_name, argument_data = argument.split("=")
|
||||
script_arguments[argument_name] = argument_data
|
||||
|
||||
# Parse the runtime arguments
|
||||
if script_run_args is not None:
|
||||
for argument in script_run_args:
|
||||
argument_name, argument_data = argument.split("=")
|
||||
script_arguments[argument_name] = argument_data
|
||||
|
||||
log_info(celery, f"Script arguments: {script_arguments}")
|
||||
vm_data["script_arguments"] = script_arguments
|
||||
|
||||
log_info(
|
||||
celery,
|
||||
"VM configuration data:\n{}".format(
|
||||
json.dumps(vm_data, sort_keys=True, indent=2)
|
||||
),
|
||||
)
|
||||
|
||||
# Phase 2 - verification
|
||||
# * Ensure that at least one node has enough free RAM to hold the VM (becomes main host)
|
||||
# * Ensure that all networks are valid
|
||||
# * Ensure that there is enough disk space in the Ceph cluster for the disks
|
||||
# This is the "safe fail" step when an invalid configuration will be caught
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Verifying configuration against cluster",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
with open_zk(config) as zkhandler:
|
||||
# Verify that a VM with this name does not already exist
|
||||
if pvc_vm.searchClusterByName(zkhandler, vm_name):
|
||||
fail(
|
||||
celery,
|
||||
f"A VM with the name '{vm_name}' already exists in the cluster",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
# Verify that at least one host has enough free RAM to run the VM
|
||||
_discard, nodes = pvc_node.get_list(zkhandler, None)
|
||||
target_node = None
|
||||
last_free = 0
|
||||
for node in nodes:
|
||||
# Skip the node if it is not ready to run VMs
|
||||
if node["daemon_state"] != "run" or node["domain_state"] != "ready":
|
||||
continue
|
||||
# Skip the node if its free memory is less than the new VM's size, plus a 512MB buffer
|
||||
if node["memory"]["free"] < (vm_data["system_details"]["vram_mb"] + 512):
|
||||
continue
|
||||
# If this node has the most free, use it
|
||||
if node["memory"]["free"] > last_free:
|
||||
last_free = node["memory"]["free"]
|
||||
target_node = node["name"]
|
||||
# Raise if no node was found
|
||||
if not target_node:
|
||||
fail(
|
||||
celery,
|
||||
f"No ready cluster node contains at least {vm_data['system_details']['vram_mb']}+512 MB of free RAM",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
log_info(
|
||||
celery,
|
||||
f'Selecting target node "{target_node}" with "{last_free}" MB free RAM',
|
||||
)
|
||||
|
||||
# Verify that all configured networks are present on the cluster
|
||||
cluster_networks, _discard = pvc_network.getClusterNetworkList(zkhandler)
|
||||
for network in vm_data["networks"]:
|
||||
vni = str(network["vni"])
|
||||
if vni not in cluster_networks and vni not in [
|
||||
"upstream",
|
||||
"cluster",
|
||||
"storage",
|
||||
]:
|
||||
fail(
|
||||
celery,
|
||||
f'The network VNI "{vni}" is not present on the cluster',
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
log_info(celery, "All configured networks for VM are valid")
|
||||
|
||||
# Verify that there is enough disk space free to provision all VM disks
|
||||
pools = dict()
|
||||
for volume in vm_data["volumes"]:
|
||||
if volume.get("source_volume") is not None:
|
||||
volume_data = pvc_ceph.getVolumeInformation(
|
||||
zkhandler, volume["pool"], volume["source_volume"]
|
||||
)
|
||||
if not volume_data:
|
||||
fail(
|
||||
celery,
|
||||
f"The source volume {volume['pool']}/{volume['source_volume']} could not be found",
|
||||
exception=ClusterError,
|
||||
)
|
||||
if not volume["pool"] in pools:
|
||||
pools[volume["pool"]] = int(
|
||||
pvc_ceph.format_bytes_fromhuman(volume_data["stats"]["size"])
|
||||
/ 1024
|
||||
/ 1024
|
||||
/ 1024
|
||||
)
|
||||
else:
|
||||
pools[volume["pool"]] += int(
|
||||
pvc_ceph.format_bytes_fromhuman(volume_data["stats"]["size"])
|
||||
/ 1024
|
||||
/ 1024
|
||||
/ 1024
|
||||
)
|
||||
else:
|
||||
if not volume["pool"] in pools:
|
||||
pools[volume["pool"]] = volume["disk_size_gb"]
|
||||
else:
|
||||
pools[volume["pool"]] += volume["disk_size_gb"]
|
||||
|
||||
for pool in pools:
|
||||
try:
|
||||
pool_information = pvc_ceph.getPoolInformation(zkhandler, pool)
|
||||
if not pool_information:
|
||||
raise
|
||||
except Exception:
|
||||
fail(
|
||||
celery,
|
||||
f'Pool "{pool}" is not present on the cluster',
|
||||
exception=ClusterError,
|
||||
)
|
||||
pool_free_space_gb = int(
|
||||
pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024
|
||||
)
|
||||
pool_vm_usage_gb = int(pools[pool])
|
||||
|
||||
if pool_vm_usage_gb >= pool_free_space_gb:
|
||||
fail(
|
||||
celery,
|
||||
f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB',
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
log_info(celery, "There is enough space on cluster to store VM volumes")
|
||||
|
||||
# Verify that every specified filesystem is valid
|
||||
used_filesystems = list()
|
||||
for volume in vm_data["volumes"]:
|
||||
if volume.get("source_volume") is not None:
|
||||
continue
|
||||
if (
|
||||
volume.get("filesystem") is not None
|
||||
and volume["filesystem"] not in used_filesystems
|
||||
):
|
||||
used_filesystems.append(volume["filesystem"])
|
||||
|
||||
for filesystem in used_filesystems:
|
||||
if filesystem is None or filesystem == "None":
|
||||
continue
|
||||
elif filesystem == "swap":
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("which mkswap")
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to find binary for mkswap: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
else:
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
"which mkfs.{}".format(filesystem)
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to find binary for mkfs.{filesystem}: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
log_info(celery, "All selected filesystems are valid")
|
||||
|
||||
# Phase 3 - provisioning script preparation
|
||||
# * Import the provisioning script as a library with importlib
|
||||
# * Ensure the required function(s) are present
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Preparing provisioning script",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Write the script out to a temporary file
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("mktemp")
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to create a temporary file: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
script_file = stdout.strip()
|
||||
with open(script_file, "w") as fh:
|
||||
fh.write(vm_data["script"])
|
||||
fh.write("\n")
|
||||
|
||||
# Import the script file
|
||||
loader = importlib.machinery.SourceFileLoader("installer_script", script_file)
|
||||
spec = importlib.util.spec_from_loader(loader.name, loader)
|
||||
installer_script = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(installer_script)
|
||||
|
||||
# Set up the VMBuilderScript object
|
||||
vm_builder = installer_script.VMBuilderScript(
|
||||
vm_name=vm_name,
|
||||
vm_id=vm_id,
|
||||
vm_profile=vm_profile,
|
||||
vm_data=vm_data,
|
||||
)
|
||||
|
||||
log_info(celery, "Provisioning script imported successfully")
|
||||
|
||||
# Create temporary directory for external chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d")
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to create a temporary directory: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
temp_dir = stdout.strip()
|
||||
|
||||
# Bind mount / to the chroot location /
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mount --bind --options ro / {temp_dir}"
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount rootfs into {temp_dir} for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Mount tmpfs to the chroot location /tmp
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mount --type tmpfs tmpfs {temp_dir}/tmp"
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /dev to the chroot location /dev
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mount --bind --options ro /dev {temp_dir}/dev"
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /run to the chroot location /run
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mount --bind --options rw /run {temp_dir}/run"
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /sys to the chroot location /sys
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mount --bind --options rw /sys {temp_dir}/sys"
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /proc to the chroot location /proc
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mount --bind --options rw /proc {temp_dir}/proc"
|
||||
)
|
||||
if retcode:
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
log_info(celery, "Chroot environment prepared successfully")
|
||||
|
||||
def general_cleanup():
|
||||
log_info(celery, "Running upper cleanup steps")
|
||||
|
||||
try:
|
||||
# Unmount bind-mounted devfs on the chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"umount {temp_dir}/dev"
|
||||
)
|
||||
# Unmount bind-mounted runfs on the chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"umount {temp_dir}/run"
|
||||
)
|
||||
# Unmount bind-mounted sysfs on the chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"umount {temp_dir}/sys"
|
||||
)
|
||||
# Unmount bind-mounted procfs on the chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"umount {temp_dir}/proc"
|
||||
)
|
||||
# Unmount bind-mounted tmpfs on the chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"umount {temp_dir}/tmp"
|
||||
)
|
||||
# Unmount bind-mounted rootfs on the chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}")
|
||||
except Exception as e:
|
||||
# We don't care about fails during cleanup, log and continue
|
||||
log_warn(celery, f"Suberror during general cleanup unmounts: {e}")
|
||||
|
||||
try:
|
||||
# Remove the temp_dir
|
||||
os.rmdir(temp_dir)
|
||||
except Exception as e:
|
||||
# We don't care about fails during cleanup, log and continue
|
||||
log_warn(celery, f"Suberror during general cleanup directory removal: {e}")
|
||||
|
||||
try:
|
||||
# Remote temporary script (don't fail if not removed)
|
||||
os.remove(script_file)
|
||||
except Exception as e:
|
||||
# We don't care about fails during cleanup, log and continue
|
||||
log_warn(celery, f"Suberror during general cleanup script removal: {e}")
|
||||
|
||||
def fail_clean(celery, msg, exception=ProvisioningError):
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.cleanup()
|
||||
except Exception:
|
||||
# We're already failing, do the best we can
|
||||
pass
|
||||
try:
|
||||
general_cleanup()
|
||||
except Exception:
|
||||
# We're already failing, do the best we can
|
||||
pass
|
||||
fail(celery, msg, exception=exception)
|
||||
|
||||
# Phase 4 - script: setup()
|
||||
# * Run pre-setup steps
|
||||
current_stage += 1
|
||||
update(
|
||||
celery, "Running script setup() step", current=current_stage, total=total_stages
|
||||
)
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.setup()
|
||||
except Exception as e:
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script setup() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 5 - script: create()
|
||||
# * Prepare the libvirt XML defintion for the VM
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script create() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
if define_vm:
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_schema = vm_builder.create()
|
||||
except Exception as e:
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script create() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
log_info(celery, "Generated VM schema:\n{}\n".format(vm_schema))
|
||||
|
||||
log_info(celery, "Defining VM on cluster")
|
||||
node_limit = vm_data["system_details"]["node_limit"]
|
||||
if node_limit:
|
||||
node_limit = node_limit.split(",")
|
||||
node_selector = vm_data["system_details"]["node_selector"]
|
||||
node_autostart = vm_data["system_details"]["node_autostart"]
|
||||
migration_method = vm_data["system_details"]["migration_method"]
|
||||
with open_zk(config) as zkhandler:
|
||||
retcode, retmsg = pvc_vm.define_vm(
|
||||
zkhandler,
|
||||
vm_schema.strip(),
|
||||
target_node,
|
||||
node_limit,
|
||||
node_selector,
|
||||
node_autostart,
|
||||
migration_method,
|
||||
vm_profile,
|
||||
initial_state="provision",
|
||||
)
|
||||
log_info(celery, retmsg)
|
||||
else:
|
||||
log_info("Skipping VM definition due to define_vm=False")
|
||||
|
||||
# Phase 6 - script: prepare()
|
||||
# * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.)
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script prepare() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.prepare()
|
||||
except Exception as e:
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script prepare() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 7 - script: install()
|
||||
# * Run installation with arguments
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script install() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.install()
|
||||
except Exception as e:
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script install() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 8 - script: cleanup()
|
||||
# * Run cleanup steps
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script cleanup() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.cleanup()
|
||||
except Exception as e:
|
||||
fail(
|
||||
celery,
|
||||
f"Error in script cleanup() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 9 - general cleanup
|
||||
# * Clean up the chroot from earlier
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running general cleanup steps",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
general_cleanup()
|
||||
|
||||
# Phase 10 - startup
|
||||
# * Start the VM in the PVC cluster
|
||||
current_stage += 1
|
||||
update(celery, "Starting VM", current=current_stage, total=total_stages)
|
||||
|
||||
if start_vm:
|
||||
with open_zk(config) as zkhandler:
|
||||
success, message = pvc_vm.start_vm(zkhandler, vm_name)
|
||||
log_info(celery, message)
|
||||
|
||||
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully'
|
||||
else:
|
||||
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully'
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
end_message,
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
@ -1,16 +0,0 @@
|
||||
# Parallel Virtual Cluster Celery Worker daemon unit file
|
||||
|
||||
[Unit]
|
||||
Description = Parallel Virtual Cluster Celery Worker daemon
|
||||
After = network-online.target
|
||||
|
||||
[Service]
|
||||
Type = simple
|
||||
WorkingDirectory = /usr/share/pvc
|
||||
Environment = PYTHONUNBUFFERED=true
|
||||
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
|
||||
ExecStart = /usr/share/pvc/pvcworkerd.sh
|
||||
Restart = on-failure
|
||||
|
||||
[Install]
|
||||
WantedBy = multi-user.target
|
@ -1,36 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# pvcworkerd.py - API Celery worker daemon startup stub
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
CELERY_BIN="$( which celery )"
|
||||
|
||||
# This absolute hackery is needed because Celery got the bright idea to change how their
|
||||
# app arguments work in a non-backwards-compatible way with Celery 5.
|
||||
case "$( cat /etc/debian_version )" in
|
||||
10.*)
|
||||
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
;;
|
||||
*)
|
||||
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
;;
|
||||
esac
|
||||
|
||||
${CELERY_BIN} ${CELERY_ARGS}
|
||||
exit $?
|
Reference in New Issue
Block a user