Compare commits

...

25 Commits

Author SHA1 Message Date
23977b04fc Bump version to 0.9.37 2021-09-30 02:08:14 -04:00
bb1cca522f Revamp benchmark tests
1. Move to a time-based (60s) benchmark to avoid these taking an absurd
amount of time to show the same information.

2. Eliminate the 256k random benchmarks, since they don't really add
anything.

3. Add in a 4k single-queue benchmark as this might provide valuable
insight into latency.

4. Adjust the output to reflect the above changes.

While this does change the benchmarking, this should not invalidate any
existing benchmarks since most of the test suit is unchanged (especially
the most important 4M sequential and 4K random tests). It simply removes
an unused entry and adds a more helpful one. The time-based change
should not significantly affect the results either, just reduces the
total runtime for long-tests and increase the runtime for quick tests to
provide a better picture.
2021-09-29 20:51:30 -04:00
9a4dce4e4c Add primary node to benchmark job name
Ensures tracking of the current primary node the job was run on, since
this may be relevant for performance reasons.
2021-09-28 09:58:22 -04:00
f6f6f07488 Add timeouts to queue gets and adjust
Ensure that all keepalive timeouts are set (prevent the queue.get()
actions from blocking forever) and set the thread timeouts to line up as
well. Everything here is thus limited to keepalive_interval seconds
(default 5s) to keep it uniform.
2021-09-27 16:10:27 -04:00
142c999ce8 Re-add success log output during migration 2021-09-27 11:50:55 -04:00
1de069298c Fix missing character in log message 2021-09-27 00:49:43 -04:00
55221b3d97 Simplify VM migration down to 3 steps
Remove two superfluous synchronization steps which are not needed here,
since the exclusive lock handles that situation anyways.

Still does not fix the weird flush->unflush lock timeout bug, but is
better worked-around now due to the cancelling of the other wait freeing
this up and continuing.
2021-09-27 00:03:20 -04:00
0d72798814 Work around synchronization lock issues
Make the block on stage C only wait for 900 seconds (15 minutes) to
prevent indefinite blocking.

The issue comes if a VM is being received, and the current unflush is
cancelled for a flush. When this happens, this lock acquisition seems to
block for no obvious reason, and no other changes seem to affect it.
This is certainly some sort of locking bug within Kazoo but I can't
diagnose it as-is. Leave a TODO to look into this again in the future.
2021-09-26 23:26:21 -04:00
3638efc77e Improve log messages during VM migration 2021-09-26 23:15:38 -04:00
c2c888d684 Use event to non-block wait and fix inf wait 2021-09-26 22:55:39 -04:00
febef2e406 Track status of VM state thread 2021-09-26 22:55:21 -04:00
2a4f38e933 Simplify locking process for VM migration
Rather than using a cumbersome and overly complex ping-pong of read and
write locks, instead move to a much simpler process using exclusive
locks.

Describing the process in ASCII or narrative is cumbersome, but the
process ping-pongs via a set of exclusive locks and wait timers, so that
the two sides are able to synchronize via blocking the exclusive lock.
The end result is a much more streamlined migration (takes about half
the time all things considered) which should be less error-prone.
2021-09-26 22:08:07 -04:00
3b805cdc34 Fix failure to connect to libvirt in keepalive
This should be caught and abort the thread rather than failing and
holding up keepalives.
2021-09-26 20:42:01 -04:00
06f0f7ed91 Fix several bugs in fence handling
1. Output from ipmitool was not being stripped, and stray newlines were
throwing off the comparisons. Fixes this.

2. Several stages were lacking meaningful messages. Adds these in so the
output is more clear about what is going on.

3. Reduce the sleep time after a fence to just 1x the
keepalive_interval, rather than 2x, because this seemed like excessively
long even for slow IPMI interfaces, especially since we're checking the
power state now anyways.

4. Set the node daemon state to an explicit 'fenced' state after a
successful fence to indicate to users that the node was indeed fenced
successfully and not still 'dead'.
2021-09-26 20:07:30 -04:00
fd040ab45a Ensure pvc-flush is after network-online 2021-09-26 17:40:42 -04:00
e23e2dd9bf Fix typo in log message 2021-09-26 03:35:30 -04:00
ee4266f8ca Tweak CLI helptext around OSD actions
Adds some more detail about OSD commands and their values.
2021-09-26 01:29:23 -04:00
0f02c5eaef Fix typo in sgdisk command options 2021-09-26 00:59:05 -04:00
075abec5fe Use re.search instead of re.match
Required since we're not matching the start of the string.
2021-09-26 00:55:29 -04:00
3a1cbf8d01 Raise basic exceptions in CephInstance
Avoids no exception to reraise errors on failures.
2021-09-26 00:50:10 -04:00
a438a4155a Fix OSD creation for partition paths and fix gdisk
The previous implementation did not work with /dev/nvme devices or any
/dev/disk/by-* devices due to some logical failures in the partition
naming scheme, so fix these, and be explicit about what is supported in
the PVC CLI command output.

The 'echo | gdisk' implementation of partition creation also did not
work due to limitations of subprocess.run; instead, use sgdisk which
allows these commands to be written out explicitly and is included in
the same package as gdisk.
2021-09-26 00:12:28 -04:00
65df807b09 Add support for configurable OSD DB ratios
The default of 0.05 (5%) is likely ideal in the initial implementation,
but allow this to be set explicitly for maximum flexibility in
space-constrained or performance-critical use-cases.
2021-09-24 01:06:39 -04:00
d0f3e9e285 Bump version to 0.9.36 2021-09-23 14:01:38 -04:00
adc8a5a3bc Add separate OSD DB device support
Adds in three parts:

1. Create an API endpoint to create OSD DB volume groups on a device.
Passed through to the node via the same command pipeline as
creating/removing OSDs, and creates a volume group with a fixed name
(osd-db).

2. Adds API support for specifying whether or not to use this DB volume
group when creating a new OSD via the "ext_db" flag. Naming and sizing
is fixed for simplicity and based on Ceph recommendations (5% of OSD
size). The Zookeeper schema tracks the block device to use during
removal.

3. Adds CLI support for the new and modified API endpoints, as well as
displaying the block device and DB block device in the OSD list.

While I debated supporting adding a DB device to an existing OSD, in
practice this ended up being a very complex operation involving stopping
the OSD and setting some options, so this is not supported; this can be
specified during OSD creation only.

Closes #142
2021-09-23 13:59:49 -04:00
df277edf1c Move console watcher stop try up
Could cause an exception if d_domain is not defined yet.
2021-09-22 16:02:04 -04:00
22 changed files with 761 additions and 230 deletions

View File

@ -1 +1 @@
0.9.35
0.9.37

View File

@ -55,6 +55,23 @@ While PVC's API and internals aren't very screenshot-worthy, here is some exampl
## Changelog
#### v0.9.37
* [All] Adds support for configurable OSD DB size ratios
* [Node Daemon] Fixes bugs with OSD creation
* [Node Daemon] Fixes exception bugs in CephInstance
* [CLI Client] Adjusts descriptions around Ceph OSDs
* [Node Daemon] Fixes ordering of pvc-flush unit
* [Node Daemon] Fixes bugs in fence handling and libvirt keepalive
* [Node Daemon] Simplifies locking for and speeds up VM migrations
* [Node Daemon] Fixes bugs in queue get timeouts
* [API Daemon] Adjusts benchmark test jobs configuration and naming
#### v0.9.36
* [Node Daemon] Fixes a bug during early cleanup
* [All] Adds support for OSD database/WAL block devices to improve Ceph performance; NOTE: Applies only to new OSDs
#### v0.9.35
* [Node Daemon] Fixes several bugs and crashes in node daemon

View File

@ -25,7 +25,7 @@ import yaml
from distutils.util import strtobool as dustrtobool
# Daemon version
version = '0.9.35'
version = '0.9.37'
# API version
API_VERSION = 1.0

View File

@ -37,12 +37,12 @@ class BenchmarkError(Exception):
"""
An exception that results from the Benchmark job.
"""
def __init__(self, message, cur_time=None, db_conn=None, db_cur=None, zkhandler=None):
def __init__(self, message, job_name=None, db_conn=None, db_cur=None, zkhandler=None):
self.message = message
if cur_time is not None:
if job_name is not None:
# Clean up our dangling result
query = "DELETE FROM storage_benchmarks WHERE job = %s;"
args = (cur_time,)
args = (job_name,)
db_cur.execute(query, args)
db_conn.commit()
# Close the database connections cleanly
@ -111,10 +111,6 @@ def run_benchmark(self, pool):
time.sleep(2)
cur_time = datetime.now().isoformat(timespec='seconds')
print("Starting storage benchmark '{}' on pool '{}'".format(cur_time, pool))
# Phase 0 - connect to databases
try:
db_conn, db_cur = open_database(config)
@ -129,14 +125,20 @@ def run_benchmark(self, pool):
print('FATAL - failed to connect to Zookeeper')
raise Exception
print("Storing running status for job '{}' in database".format(cur_time))
cur_time = datetime.now().isoformat(timespec='seconds')
cur_primary = zkhandler.read('base.config.primary_node')
job_name = '{}_{}'.format(cur_time, cur_primary)
print("Starting storage benchmark '{}' on pool '{}'".format(job_name, pool))
print("Storing running status for job '{}' in database".format(job_name))
try:
query = "INSERT INTO storage_benchmarks (job, result) VALUES (%s, %s);"
args = (cur_time, "Running",)
args = (job_name, "Running",)
db_cur.execute(query, args)
db_conn.commit()
except Exception as e:
raise BenchmarkError("Failed to store running status: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
raise BenchmarkError("Failed to store running status: {}".format(e), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
# Phase 1 - volume preparation
self.update_state(state='RUNNING', meta={'current': 1, 'total': 3, 'status': 'Creating benchmark volume'})
@ -147,7 +149,7 @@ def run_benchmark(self, pool):
# Create the RBD volume
retcode, retmsg = pvc_ceph.add_volume(zkhandler, pool, volume, "8G")
if not retcode:
raise BenchmarkError('Failed to create volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
raise BenchmarkError('Failed to create volume "{}": {}'.format(volume, retmsg), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
else:
print(retmsg)
@ -169,72 +171,83 @@ def run_benchmark(self, pool):
test_matrix = {
'seq_read': {
'direction': 'read',
'iodepth': '64',
'bs': '4M',
'rw': 'read'
},
'seq_write': {
'direction': 'write',
'iodepth': '64',
'bs': '4M',
'rw': 'write'
},
'rand_read_4M': {
'direction': 'read',
'iodepth': '64',
'bs': '4M',
'rw': 'randread'
},
'rand_write_4M': {
'direction': 'write',
'iodepth': '64',
'bs': '4M',
'rw': 'randwrite'
},
'rand_read_256K': {
'direction': 'read',
'bs': '256K',
'rw': 'randread'
},
'rand_write_256K': {
'direction': 'write',
'bs': '256K',
'rw': 'randwrite'
},
'rand_read_4K': {
'direction': 'read',
'iodepth': '64',
'bs': '4K',
'rw': 'randread'
},
'rand_write_4K': {
'direction': 'write',
'iodepth': '64',
'bs': '4K',
'rw': 'randwrite'
}
},
'rand_read_4K_lowdepth': {
'direction': 'read',
'iodepth': '1',
'bs': '4K',
'rw': 'randread'
},
'rand_write_4K_lowdepth': {
'direction': 'write',
'iodepth': '1',
'bs': '4K',
'rw': 'randwrite'
},
}
parsed_results = dict()
for test in test_matrix:
print("Running test '{}'".format(test))
fio_cmd = """
fio \
--output-format=terse \
--terse-version=5 \
--name={test} \
--ioengine=rbd \
--pool={pool} \
--rbdname={volume} \
--output-format=terse \
--terse-version=5 \
--direct=1 \
--randrepeat=1 \
--iodepth=64 \
--size=8G \
--name={test} \
--iodepth={iodepth} \
--numjobs=1 \
--time_based \
--runtime=60 \
--bs={bs} \
--readwrite={rw}
""".format(
test=test,
pool=pool,
volume=volume,
test=test,
iodepth=test_matrix[test]['iodepth'],
bs=test_matrix[test]['bs'],
rw=test_matrix[test]['rw'])
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
if retcode:
raise BenchmarkError("Failed to run fio test: {}".format(stderr), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
raise BenchmarkError("Failed to run fio test: {}".format(stderr), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
# Parse the terse results to avoid storing tons of junk
# Reference: https://fio.readthedocs.io/en/latest/fio_doc.html#terse-output
@ -437,18 +450,18 @@ def run_benchmark(self, pool):
# Remove the RBD volume
retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, volume)
if not retcode:
raise BenchmarkError('Failed to remove volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
raise BenchmarkError('Failed to remove volume "{}": {}'.format(volume, retmsg), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
else:
print(retmsg)
print("Storing result of tests for job '{}' in database".format(cur_time))
print("Storing result of tests for job '{}' in database".format(job_name))
try:
query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;"
args = (json.dumps(parsed_results), cur_time)
args = (json.dumps(parsed_results), job_name)
db_cur.execute(query, args)
db_conn.commit()
except Exception as e:
raise BenchmarkError("Failed to store test results: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
raise BenchmarkError("Failed to store test results: {}".format(e), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
close_database(db_conn, db_cur)
zkhandler.disconnect()

View File

@ -3599,6 +3599,52 @@ class API_Storage_Ceph_Option(Resource):
api.add_resource(API_Storage_Ceph_Option, '/storage/ceph/option')
# /storage/ceph/osddb
class API_Storage_Ceph_OSDDB_Root(Resource):
@RequestParser([
{'name': 'node', 'required': True, 'helptext': "A valid node must be specified."},
{'name': 'device', 'required': True, 'helptext': "A valid device must be specified."},
])
@Authenticator
def post(self, reqargs):
"""
Add a Ceph OSD database volume group to the cluster
Note: This task may take up to 30s to complete and return
---
tags:
- storage / ceph
parameters:
- in: query
name: node
type: string
required: true
description: The PVC node to create the OSD DB volume group on
- in: query
name: device
type: string
required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) to create the OSD DB volume group on
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Bad request
schema:
type: object
id: Message
"""
return api_helper.ceph_osd_db_vg_add(
reqargs.get('node', None),
reqargs.get('device', None)
)
api.add_resource(API_Storage_Ceph_OSDDB_Root, '/storage/ceph/osddb')
# /storage/ceph/osd
class API_Storage_Ceph_OSD_Root(Resource):
@RequestParser([
@ -3619,6 +3665,12 @@ class API_Storage_Ceph_OSD_Root(Resource):
id:
type: string (containing integer)
description: The Ceph ID of the OSD
device:
type: string
description: The OSD data block device
db_device:
type: string
description: The OSD database/WAL block device (logical volume); empty if not applicable
stats:
type: object
properties:
@ -3698,6 +3750,8 @@ class API_Storage_Ceph_OSD_Root(Resource):
{'name': 'node', 'required': True, 'helptext': "A valid node must be specified."},
{'name': 'device', 'required': True, 'helptext': "A valid device must be specified."},
{'name': 'weight', 'required': True, 'helptext': "An OSD weight must be specified."},
{'name': 'ext_db', 'required': False, 'helptext': "Whether to use an external OSD DB LV device."},
{'name': 'ext_db_ratio', 'required': False, 'helptext': "Decimal size ratio of the external OSD DB LV device."},
])
@Authenticator
def post(self, reqargs):
@ -3723,6 +3777,16 @@ class API_Storage_Ceph_OSD_Root(Resource):
type: number
required: true
description: The Ceph CRUSH weight for the OSD
- in: query
name: ext_db
type: boolean
required: false
description: Whether to use an external OSD DB LV device
- in: query
name: ext_db_ratio
type: float
required: false
description: Decimal ratio of total OSD size for the external OSD DB LV device, default 0.05 (5%)
responses:
200:
description: OK
@ -3738,7 +3802,9 @@ class API_Storage_Ceph_OSD_Root(Resource):
return api_helper.ceph_osd_add(
reqargs.get('node', None),
reqargs.get('device', None),
reqargs.get('weight', None)
reqargs.get('weight', None),
reqargs.get('ext_db', False),
float(reqargs.get('ext_db_ratio', 0.05)),
)

View File

@ -1274,11 +1274,29 @@ def ceph_osd_state(zkhandler, osd):
@ZKConnection(config)
def ceph_osd_add(zkhandler, node, device, weight):
def ceph_osd_db_vg_add(zkhandler, node, device):
"""
Add a Ceph OSD database VG to the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.add_osd_db_vg(zkhandler, node, device)
if retflag:
retcode = 200
else:
retcode = 400
output = {
'message': retdata.replace('\"', '\'')
}
return output, retcode
@ZKConnection(config)
def ceph_osd_add(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
"""
Add a Ceph OSD to the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.add_osd(zkhandler, node, device, weight)
retflag, retdata = pvc_ceph.add_osd(zkhandler, node, device, weight, ext_db_flag, ext_db_ratio)
if retflag:
retcode = 200

View File

@ -149,6 +149,31 @@ def format_raw_output(status_data):
return '\n'.join(ainformation)
#
# OSD DB VG functions
#
def ceph_osd_db_vg_add(config, node, device):
"""
Add new Ceph OSD database volume group
API endpoint: POST /api/v1/storage/ceph/osddb
API arguments: node={node}, device={device}
API schema: {"message":"{data}"}
"""
params = {
'node': node,
'device': device
}
response = call_api(config, 'post', '/storage/ceph/osddb', params=params)
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json().get('message', '')
#
# OSD functions
#
@ -197,18 +222,20 @@ def ceph_osd_list(config, limit):
return False, response.json().get('message', '')
def ceph_osd_add(config, node, device, weight):
def ceph_osd_add(config, node, device, weight, ext_db_flag, ext_db_ratio):
"""
Add new Ceph OSD
API endpoint: POST /api/v1/storage/ceph/osd
API arguments: node={node}, device={device}, weight={weight}
API arguments: node={node}, device={device}, weight={weight}, ext_db={ext_db_flag}, ext_db_ratio={ext_db_ratio}
API schema: {"message":"{data}"}
"""
params = {
'node': node,
'device': device,
'weight': weight
'weight': weight,
'ext_db': ext_db_flag,
'ext_db_ratio': ext_db_ratio
}
response = call_api(config, 'post', '/storage/ceph/osd', params=params)
@ -312,13 +339,15 @@ def format_list_osd(osd_list):
osd_list_output = []
osd_id_length = 3
osd_node_length = 5
osd_device_length = 6
osd_db_device_length = 9
osd_up_length = 4
osd_in_length = 4
osd_size_length = 5
osd_weight_length = 3
osd_reweight_length = 5
osd_pgs_length = 4
osd_node_length = 5
osd_used_length = 5
osd_free_length = 6
osd_util_length = 6
@ -358,10 +387,21 @@ def format_list_osd(osd_list):
if _osd_id_length > osd_id_length:
osd_id_length = _osd_id_length
# Set the OSD node length
_osd_node_length = len(osd_information['stats']['node']) + 1
if _osd_node_length > osd_node_length:
osd_node_length = _osd_node_length
# Set the OSD device length
_osd_device_length = len(osd_information['device']) + 1
if _osd_device_length > osd_device_length:
osd_device_length = _osd_device_length
# Set the OSD db_device length
_osd_db_device_length = len(osd_information['db_device']) + 1
if _osd_db_device_length > osd_db_device_length:
osd_db_device_length = _osd_db_device_length
# Set the size and length
_osd_size_length = len(str(osd_information['stats']['size'])) + 1
if _osd_size_length > osd_size_length:
@ -422,12 +462,12 @@ def format_list_osd(osd_list):
osd_list_output.append('{bold}{osd_header: <{osd_header_length}} {state_header: <{state_header_length}} {details_header: <{details_header_length}} {read_header: <{read_header_length}} {write_header: <{write_header_length}}{end_bold}'.format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
osd_header_length=osd_id_length + osd_node_length + 1,
osd_header_length=osd_id_length + osd_node_length + osd_device_length + osd_db_device_length + 3,
state_header_length=osd_up_length + osd_in_length + 1,
details_header_length=osd_size_length + osd_pgs_length + osd_weight_length + osd_reweight_length + osd_used_length + osd_free_length + osd_util_length + osd_var_length + 7,
read_header_length=osd_rdops_length + osd_rddata_length + 1,
write_header_length=osd_wrops_length + osd_wrdata_length + 1,
osd_header='OSDs ' + ''.join(['-' for _ in range(5, osd_id_length + osd_node_length)]),
osd_header='OSDs ' + ''.join(['-' for _ in range(5, osd_id_length + osd_node_length + osd_device_length + osd_db_device_length + 2)]),
state_header='State ' + ''.join(['-' for _ in range(6, osd_up_length + osd_in_length)]),
details_header='Details ' + ''.join(['-' for _ in range(8, osd_size_length + osd_pgs_length + osd_weight_length + osd_reweight_length + osd_used_length + osd_free_length + osd_util_length + osd_var_length + 6)]),
read_header='Read ' + ''.join(['-' for _ in range(5, osd_rdops_length + osd_rddata_length)]),
@ -437,6 +477,8 @@ def format_list_osd(osd_list):
osd_list_output.append('{bold}\
{osd_id: <{osd_id_length}} \
{osd_node: <{osd_node_length}} \
{osd_device: <{osd_device_length}} \
{osd_db_device: <{osd_db_device_length}} \
{osd_up: <{osd_up_length}} \
{osd_in: <{osd_in_length}} \
{osd_size: <{osd_size_length}} \
@ -456,6 +498,8 @@ def format_list_osd(osd_list):
end_bold=ansiprint.end(),
osd_id_length=osd_id_length,
osd_node_length=osd_node_length,
osd_device_length=osd_device_length,
osd_db_device_length=osd_db_device_length,
osd_up_length=osd_up_length,
osd_in_length=osd_in_length,
osd_size_length=osd_size_length,
@ -472,6 +516,8 @@ def format_list_osd(osd_list):
osd_rddata_length=osd_rddata_length,
osd_id='ID',
osd_node='Node',
osd_device='Block',
osd_db_device='DB Block',
osd_up='Up',
osd_in='In',
osd_size='Size',
@ -500,10 +546,16 @@ def format_list_osd(osd_list):
osd_util = round(osd_information['stats']['utilization'], 2)
osd_var = round(osd_information['stats']['var'], 2)
osd_db_device = osd_information['db_device']
if not osd_db_device:
osd_db_device = 'N/A'
# Format the output header
osd_list_output.append('{bold}\
{osd_id: <{osd_id_length}} \
{osd_node: <{osd_node_length}} \
{osd_device: <{osd_device_length}} \
{osd_db_device: <{osd_db_device_length}} \
{osd_up_colour}{osd_up: <{osd_up_length}}{end_colour} \
{osd_in_colour}{osd_in: <{osd_in_length}}{end_colour} \
{osd_size: <{osd_size_length}} \
@ -524,6 +576,8 @@ def format_list_osd(osd_list):
end_colour=ansiprint.end(),
osd_id_length=osd_id_length,
osd_node_length=osd_node_length,
osd_device_length=osd_device_length,
osd_db_device_length=osd_db_device_length,
osd_up_length=osd_up_length,
osd_in_length=osd_in_length,
osd_size_length=osd_size_length,
@ -540,6 +594,8 @@ def format_list_osd(osd_list):
osd_rddata_length=osd_rddata_length,
osd_id=osd_information['id'],
osd_node=osd_information['stats']['node'],
osd_device=osd_information['device'],
osd_db_device=osd_db_device,
osd_up_colour=osd_up_colour,
osd_up=osd_up_flag,
osd_in_colour=osd_in_colour,
@ -1550,10 +1606,10 @@ def format_info_benchmark(config, benchmark_information):
"seq_write": "Sequential Write (4M blocks)",
"rand_read_4M": "Random Read (4M blocks)",
"rand_write_4M": "Random Write (4M blocks)",
"rand_read_256K": "Random Read (256K blocks)",
"rand_write_256K": "Random Write (256K blocks)",
"rand_read_4K": "Random Read (4K blocks)",
"rand_write_4K": "Random Write (4K blocks)"
"rand_write_4K": "Random Write (4K blocks)",
"rand_read_4K_lowdepth": "Random Read (4K blocks, single-queue)",
"rand_write_4K_lowdepth": "Random Write (4K blocks, single-queue)",
}
test_name_length = 30
@ -1566,7 +1622,16 @@ def format_info_benchmark(config, benchmark_information):
cpuutil_label_length = 11
cpuutil_column_length = 9
# Work around old results that did not have these tests
if 'rand_read_4K_lowdepth' not in benchmark_details:
del nice_test_name_map['rand_read_4K_lowdepth']
del nice_test_name_map['rand_write_4K_lowdepth']
for test in benchmark_details:
# Work around old results that had these obsolete tests
if test == 'rand_read_256K' or test == 'rand_write_256K':
continue
_test_name_length = len(nice_test_name_map[test])
if _test_name_length > test_name_length:
test_name_length = _test_name_length
@ -1603,6 +1668,10 @@ def format_info_benchmark(config, benchmark_information):
cpuutil_column_length = _element_length
for test in benchmark_details:
# Work around old results that had these obsolete tests
if test == 'rand_read_256K' or test == 'rand_write_256K':
continue
ainformation.append('')
test_details = benchmark_details[test]

View File

@ -2526,7 +2526,7 @@ def ceph_benchmark_run(pool):
Run a storage benchmark on POOL in the background.
"""
try:
click.confirm('NOTE: Storage benchmarks generate significant load on the cluster and can take a very long time to complete on slow storage. They should be run sparingly. Continue', prompt_suffix='? ', abort=True)
click.confirm('NOTE: Storage benchmarks take approximately 8 minutes to run and generate significant load on the storage cluster; they should be run sparingly. Continue', prompt_suffix='? ', abort=True)
except Exception:
exit(0)
@ -2583,6 +2583,38 @@ def ceph_osd():
pass
###############################################################################
# pvc storage osd create-db-vg
###############################################################################
@click.command(name='create-db-vg', short_help='Create new OSD database volume group.')
@click.argument(
'node'
)
@click.argument(
'device'
)
@click.option(
'-y', '--yes', 'confirm_flag',
is_flag=True, default=False,
help='Confirm the creation.'
)
@cluster_req
def ceph_osd_create_db_vg(node, device, confirm_flag):
"""
Create a new Ceph OSD database volume group on node NODE with block device DEVICE. DEVICE must be a valid raw block device, one of e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...', etc. Using partitions is not supported.
This volume group will be used for Ceph OSD database and WAL functionality if the '--ext-db' flag is passed to newly-created OSDs during 'pvc storage osd add'. DEVICE should be an extremely fast SSD device (NVMe, Intel Optane, etc.) which is significantly faster than the normal OSD disks and with very high write endurance. Only one OSD database volume group on a single physical device is supported per node, so it must be fast and large enough to act as an effective OSD database device for all OSDs on the node. Attempting to add additional database volume groups after the first will fail.
"""
if not confirm_flag and not config['unsafe']:
try:
click.confirm('Destroy all data and create a new OSD database volume group on {}:{}'.format(node, device), prompt_suffix='? ', abort=True)
except Exception:
exit(0)
retcode, retmsg = pvc_ceph.ceph_osd_db_vg_add(config, node, device)
cleanup(retcode, retmsg)
###############################################################################
# pvc storage osd add
###############################################################################
@ -2598,15 +2630,31 @@ def ceph_osd():
default=1.0, show_default=True,
help='Weight of the OSD within the CRUSH map.'
)
@click.option(
'-d', '--ext-db', 'ext_db_flag',
is_flag=True, default=False,
help='Use an external database logical volume for this OSD.'
)
@click.option(
'-r', '--ext-db-ratio', 'ext_db_ratio',
default=0.05, show_default=True, type=float,
help='Decimal ratio of the external database logical volume to the OSD size.'
)
@click.option(
'-y', '--yes', 'confirm_flag',
is_flag=True, default=False,
help='Confirm the removal'
help='Confirm the creation.'
)
@cluster_req
def ceph_osd_add(node, device, weight, confirm_flag):
def ceph_osd_add(node, device, weight, ext_db_flag, ext_db_ratio, confirm_flag):
"""
Add a new Ceph OSD on node NODE with block device DEVICE.
Add a new Ceph OSD on node NODE with block device DEVICE. DEVICE must be a valid raw block device, one of e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...', etc. Using partitions is not supported.
The weight of an OSD should reflect the ratio of the OSD to other OSDs in the storage cluster. For example, if all OSDs are the same size as recommended for PVC, 1 (the default) is a valid weight so that all are treated identically. If a new OSD is added later which is 4x the size of the existing OSDs, the new OSD's weight should then be 4 to tell the cluster that 4x the data can be stored on the OSD. Weights can also be tweaked for performance reasons, since OSDs with more data will incur more I/O load. For more information about CRUSH weights, please see the Ceph documentation.
If '--ext-db' is specified, the OSD database and WAL will be placed on a new logical volume in NODE's OSD database volume group; it must exist or OSD creation will fail. See the 'pvc storage osd create-db-vg' command for more details.
The default '--ext-db-ratio' of 0.05 (5%) is sufficient for most RBD workloads and OSD sizes, though this can be adjusted based on the sizes of the OSD(s) and the underlying database device. Ceph documentation recommends at least 0.02 (2%) for RBD use-cases, and higher values may improve WAL performance under write-heavy workloads with fewer OSDs per node.
"""
if not confirm_flag and not config['unsafe']:
try:
@ -2614,7 +2662,7 @@ def ceph_osd_add(node, device, weight, confirm_flag):
except Exception:
exit(0)
retcode, retmsg = pvc_ceph.ceph_osd_add(config, node, device, weight)
retcode, retmsg = pvc_ceph.ceph_osd_add(config, node, device, weight, ext_db_flag, ext_db_ratio)
cleanup(retcode, retmsg)
@ -2635,7 +2683,7 @@ def ceph_osd_remove(osdid, confirm_flag):
"""
Remove a Ceph OSD with ID OSDID.
DANGER: This will completely remove the OSD from the cluster. OSDs will rebalance which may negatively affect performance or available space.
DANGER: This will completely remove the OSD from the cluster. OSDs will rebalance which will negatively affect performance and available space. It is STRONGLY RECOMMENDED to set an OSD out (using 'pvc storage osd out') and allow the cluster to fully rebalance (verified with 'pvc storage status') before removing an OSD.
"""
if not confirm_flag and not config['unsafe']:
try:
@ -4856,6 +4904,7 @@ ceph_benchmark.add_command(ceph_benchmark_run)
ceph_benchmark.add_command(ceph_benchmark_info)
ceph_benchmark.add_command(ceph_benchmark_list)
ceph_osd.add_command(ceph_osd_create_db_vg)
ceph_osd.add_command(ceph_osd_add)
ceph_osd.add_command(ceph_osd_remove)
ceph_osd.add_command(ceph_osd_in)

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='pvc',
version='0.9.35',
version='0.9.37',
packages=['pvc', 'pvc.cli_lib'],
install_requires=[
'Click',

View File

@ -180,20 +180,63 @@ def getClusterOSDList(zkhandler):
def getOSDInformation(zkhandler, osd_id):
# Get the devices
osd_device = zkhandler.read(('osd.device', osd_id))
osd_db_device = zkhandler.read(('osd.db_device', osd_id))
# Parse the stats data
osd_stats_raw = zkhandler.read(('osd.stats', osd_id))
osd_stats = dict(json.loads(osd_stats_raw))
osd_information = {
'id': osd_id,
'stats': osd_stats
'device': osd_device,
'db_device': osd_db_device,
'stats': osd_stats,
}
return osd_information
# OSD DB VG actions use the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd_db_vg(zkhandler, node, device):
# Verify the target node exists
if not common.verifyNode(zkhandler, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
# Tell the cluster to create a new OSD for the host
add_osd_db_vg_string = 'db_vg_add {},{}'.format(node, device)
zkhandler.write([
('base.cmd.ceph', add_osd_db_vg_string)
])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock('base.cmd.ceph'):
try:
result = zkhandler.read('base.cmd.ceph').split()[0]
if result == 'success-db_vg_add':
message = 'Created new OSD database VG at "{}" on node "{}".'.format(device, node)
success = True
else:
message = 'ERROR: Failed to create new OSD database VG; check node logs for details.'
success = False
except Exception:
message = 'ERROR: Command ignored by node.'
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock('base.cmd.ceph'):
time.sleep(0.5)
zkhandler.write([
('base.cmd.ceph', '')
])
return success, message
# OSD addition and removal uses the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd(zkhandler, node, device, weight):
def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
# Verify the target node exists
if not common.verifyNode(zkhandler, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
@ -204,7 +247,7 @@ def add_osd(zkhandler, node, device, weight):
return False, 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(device, node, block_osd)
# Tell the cluster to create a new OSD for the host
add_osd_string = 'osd_add {},{},{}'.format(node, device, weight)
add_osd_string = 'osd_add {},{},{},{},{}'.format(node, device, weight, ext_db_flag, ext_db_ratio)
zkhandler.write([
('base.cmd.ceph', add_osd_string)
])

View File

@ -0,0 +1 @@
{"version": "5", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "cmd": "/cmd", "cmd.node": "/cmd/nodes", "cmd.domain": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "logs": "/logs", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -466,7 +466,7 @@ class ZKHandler(object):
#
class ZKSchema(object):
# Current version
_version = 4
_version = 5
# Root for doing nested keys
_schema_root = ''
@ -636,6 +636,7 @@ class ZKSchema(object):
'id': '', # The root key
'node': '/node',
'device': '/device',
'db_device': '/db_device',
'stats': '/stats'
},
# The schema of an individual pool entry (/ceph/pools/{pool_name})

21
debian/changelog vendored
View File

@ -1,3 +1,24 @@
pvc (0.9.37-0) unstable; urgency=high
* [All] Adds support for configurable OSD DB size ratios
* [Node Daemon] Fixes bugs with OSD creation
* [Node Daemon] Fixes exception bugs in CephInstance
* [CLI Client] Adjusts descriptions around Ceph OSDs
* [Node Daemon] Fixes ordering of pvc-flush unit
* [Node Daemon] Fixes bugs in fence handling and libvirt keepalive
* [Node Daemon] Simplifies locking for and speeds up VM migrations
* [Node Daemon] Fixes bugs in queue get timeouts
* [API Daemon] Adjusts benchmark test jobs configuration and naming
-- Joshua M. Boniface <joshua@boniface.me> Thu, 30 Sep 2021 02:02:53 -0400
pvc (0.9.36-0) unstable; urgency=high
* [Node Daemon] Fixes a bug during early cleanup
* [All] Adds support for OSD database/WAL block devices to improve Ceph performance; NOTE: Applies only to new OSDs
-- Joshua M. Boniface <joshua@boniface.me> Thu, 23 Sep 2021 14:01:38 -0400
pvc (0.9.35-0) unstable; urgency=high
* [Node Daemon] Fixes several bugs and crashes in node daemon

View File

@ -55,6 +55,23 @@ While PVC's API and internals aren't very screenshot-worthy, here is some exampl
## Changelog
#### v0.9.37
* [All] Adds support for configurable OSD DB size ratios
* [Node Daemon] Fixes bugs with OSD creation
* [Node Daemon] Fixes exception bugs in CephInstance
* [CLI Client] Adjusts descriptions around Ceph OSDs
* [Node Daemon] Fixes ordering of pvc-flush unit
* [Node Daemon] Fixes bugs in fence handling and libvirt keepalive
* [Node Daemon] Simplifies locking for and speeds up VM migrations
* [Node Daemon] Fixes bugs in queue get timeouts
* [API Daemon] Adjusts benchmark test jobs configuration and naming
#### v0.9.36
* [Node Daemon] Fixes a bug during early cleanup
* [All] Adds support for OSD database/WAL block devices to improve Ceph performance; NOTE: Applies only to new OSDs
#### v0.9.35
* [Node Daemon] Fixes several bugs and crashes in node daemon

View File

@ -520,6 +520,14 @@
},
"osd": {
"properties": {
"db_device": {
"description": "The OSD database/WAL block device (logical volume); empty if not applicable",
"type": "string"
},
"device": {
"description": "The OSD data block device",
"type": "string"
},
"id": {
"description": "The Ceph ID of the OSD",
"type": "string (containing integer)"
@ -5011,6 +5019,20 @@
"name": "weight",
"required": true,
"type": "number"
},
{
"description": "Whether to use an external OSD DB LV device",
"in": "query",
"name": "ext_db",
"required": false,
"type": "boolean"
},
{
"description": "Decimal ratio of total OSD size for the external OSD DB LV device, default 0.05 (5%)",
"in": "query",
"name": "ext_db_ratio",
"required": false,
"type": "float"
}
],
"responses": {
@ -5133,6 +5155,45 @@
]
}
},
"/api/v1/storage/ceph/osddb": {
"post": {
"description": "Note: This task may take up to 30s to complete and return",
"parameters": [
{
"description": "The PVC node to create the OSD DB volume group on",
"in": "query",
"name": "node",
"required": true,
"type": "string"
},
{
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) to create the OSD DB volume group on",
"in": "query",
"name": "device",
"required": true,
"type": "string"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/Message"
}
},
"400": {
"description": "Bad request",
"schema": {
"$ref": "#/definitions/Message"
}
}
},
"summary": "Add a Ceph OSD database volume group to the cluster",
"tags": [
"storage / ceph"
]
}
},
"/api/v1/storage/ceph/pool": {
"get": {
"description": "",
@ -6248,6 +6309,68 @@
]
}
},
"/api/v1/vm/{vm}/device": {
"delete": {
"description": "",
"parameters": [
{
"description": "The raw Libvirt XML definition of the device to detach",
"in": "query",
"name": "xml",
"required": true,
"type": "string"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/Message"
}
},
"400": {
"description": "Bad request",
"schema": {
"$ref": "#/definitions/Message"
}
}
},
"summary": "Hot-detach device XML to {vm}",
"tags": [
"vm"
]
},
"post": {
"description": "",
"parameters": [
{
"description": "The raw Libvirt XML definition of the device to attach",
"in": "query",
"name": "xml",
"required": true,
"type": "string"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/Message"
}
},
"400": {
"description": "Bad request",
"schema": {
"$ref": "#/definitions/Message"
}
}
},
"summary": "Hot-attach device XML to {vm}",
"tags": [
"vm"
]
}
},
"/api/v1/vm/{vm}/locks": {
"post": {
"description": "",

View File

@ -2,7 +2,7 @@
[Unit]
Description = Parallel Virtual Cluster autoflush daemon
After = pvcnoded.service pvcapid.service zookeeper.service libvirtd.service ssh.service ceph.target
After = pvcnoded.service pvcapid.service zookeeper.service libvirtd.service ssh.service ceph.target network-online.target
Wants = pvcnoded.service
PartOf = pvc.target

View File

@ -48,7 +48,7 @@ import re
import json
# Daemon version
version = '0.9.35'
version = '0.9.37'
##########################################################
@ -217,10 +217,10 @@ def entrypoint():
# Stop console logging on all VMs
logger.out('Stopping domain console watchers', state='s')
try:
if d_domain is not None:
for domain in d_domain:
if d_domain[domain].getnode() == config['node_hostname']:
try:
d_domain[domain].console_log_instance.stop()
except Exception:
pass
@ -689,7 +689,7 @@ def entrypoint():
# Update the new list
volume_list[pool] = new_volume_list
logger.out(f'{logger.fmt_blue}Volume list [{pool}:{logger.fmt_end} {" ".join(volume_list[pool])}', state='i')
logger.out(f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}', state='i')
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(logger, config, zkhandler, this_node)

View File

@ -25,6 +25,9 @@ import psutil
import daemon_lib.common as common
from distutils.util import strtobool
from re import search
class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id):
@ -66,7 +69,7 @@ class CephOSDInstance(object):
self.stats = json.loads(data)
@staticmethod
def add_osd(zkhandler, logger, node, device, weight):
def add_osd(zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
# We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
try:
@ -76,7 +79,7 @@ class CephOSDInstance(object):
print('ceph osd create')
print(stdout)
print(stderr)
raise
raise Exception
osd_id = stdout.rstrip()
# 2. Remove that newly-created OSD
@ -85,7 +88,7 @@ class CephOSDInstance(object):
print('ceph osd rm')
print(stdout)
print(stderr)
raise
raise Exception
# 3a. Zap the disk to ensure it is ready to go
logger.out('Zapping disk {}'.format(device), state='i')
@ -94,21 +97,35 @@ class CephOSDInstance(object):
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
raise Exception
# 3b. Create the OSD for real
dev_flags = "--data {}".format(device)
# 3b. Prepare the logical volume if ext_db_flag
if ext_db_flag:
_, osd_size_bytes, _ = common.run_os_command('blockdev --getsize64 {}'.format(device))
osd_size_bytes = int(osd_size_bytes)
result = CephOSDInstance.create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes)
if not result:
raise Exception
db_device = "osd-db/osd-{}".format(osd_id)
dev_flags += " --block.db {}".format(db_device)
else:
db_device = ""
# 3c. Create the OSD for real
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm prepare --bluestore --data {device}'.format(
'ceph-volume lvm prepare --bluestore {devices}'.format(
osdid=osd_id,
device=device
devices=dev_flags
)
)
if retcode:
print('ceph-volume lvm prepare')
print(stdout)
print(stderr)
raise
raise Exception
# 4a. Get OSD FSID
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
@ -127,7 +144,7 @@ class CephOSDInstance(object):
print('Could not find OSD fsid in data:')
print(stdout)
print(stderr)
raise
raise Exception
# 4b. Activate the OSD
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
@ -141,7 +158,7 @@ class CephOSDInstance(object):
print('ceph-volume lvm activate')
print(stdout)
print(stderr)
raise
raise Exception
# 5. Add it to the crush map
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
@ -156,7 +173,7 @@ class CephOSDInstance(object):
print('ceph osd crush add')
print(stdout)
print(stderr)
raise
raise Exception
time.sleep(0.5)
# 6. Verify it started
@ -169,7 +186,7 @@ class CephOSDInstance(object):
print('systemctl status')
print(stdout)
print(stderr)
raise
raise Exception
# 7. Add the new OSD to the list
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
@ -177,6 +194,7 @@ class CephOSDInstance(object):
(('osd', osd_id), ''),
(('osd.node', osd_id), node),
(('osd.device', osd_id), device),
(('osd.db_device', osd_id), db_device),
(('osd.stats', osd_id), '{}'),
])
@ -206,7 +224,7 @@ class CephOSDInstance(object):
print('ceph osd out')
print(stdout)
print(stderr)
raise
raise Exception
# 2. Wait for the OSD to flush
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
@ -222,7 +240,7 @@ class CephOSDInstance(object):
if num_pgs > 0:
time.sleep(5)
else:
raise
raise Exception
except Exception:
break
@ -233,7 +251,7 @@ class CephOSDInstance(object):
print('systemctl stop')
print(stdout)
print(stderr)
raise
raise Exception
# FIXME: There has to be a better way to do this /shrug
while True:
@ -259,7 +277,7 @@ class CephOSDInstance(object):
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
raise Exception
# 6. Purge the OSD from Ceph
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
@ -268,9 +286,15 @@ class CephOSDInstance(object):
print('ceph osd purge')
print(stdout)
print(stderr)
raise
raise Exception
# 7. Delete OSD from ZK
# 7. Remove the DB device
if zkhandler.exists(('osd.db_device', osd_id)):
db_device = zkhandler.read(('osd.db_device', osd_id))
logger.out('Removing OSD DB logical volume "{}"'.format(db_device), state='i')
retcode, stdout, stderr = common.run_os_command('lvremove --yes --force {}'.format(db_device))
# 8. Delete OSD from ZK
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
zkhandler.delete(('osd', osd_id), recursive=True)
@ -282,6 +306,114 @@ class CephOSDInstance(object):
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
@staticmethod
def add_db_vg(zkhandler, logger, device):
logger.out('Creating new OSD database volume group on block device {}'.format(device), state='i')
try:
# 0. Check if an existsing volume group exists
retcode, stdout, stderr = common.run_os_command(
'vgdisplay osd-db'
)
if retcode != 5:
logger.out('Ceph OSD database VG "osd-db" already exists', state='e')
return False
# 1. Create an empty partition table
logger.out('Creating partitions on block device {}'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command(
'sgdisk --clear {}'.format(device)
)
if retcode:
print('sgdisk create partition table')
print(stdout)
print(stderr)
raise Exception
retcode, stdout, stderr = common.run_os_command(
'sgdisk --new 1:: --typecode 1:8e00 {}'.format(device)
)
if retcode:
print('sgdisk create pv partition')
print(stdout)
print(stderr)
raise Exception
# Handle the partition ID portion
if search(r'by-path', device) or search(r'by-id', device):
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
partition = '{}-part1'.format(device)
elif search(r'nvme', device):
# /dev/nvme0n1 -> nvme0n1p1
partition = '{}p1'.format(device)
else:
# /dev/sda -> sda1
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
partition = '{}1'.format(device)
# 2. Create the PV
logger.out('Creating PV on block device {}'.format(partition), state='i')
retcode, stdout, stderr = common.run_os_command(
'pvcreate --force {}'.format(partition)
)
if retcode:
print('pv creation')
print(stdout)
print(stderr)
raise Exception
# 2. Create the VG (named 'osd-db')
logger.out('Creating VG "osd-db" on block device {}'.format(partition), state='i')
retcode, stdout, stderr = common.run_os_command(
'vgcreate --force osd-db {}'.format(partition)
)
if retcode:
print('vg creation')
print(stdout)
print(stderr)
raise Exception
# Log it
logger.out('Created new OSD database volume group on block device {}'.format(device), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create OSD database volume group: {}'.format(e), state='e')
return False
@staticmethod
def create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes):
logger.out('Creating new OSD database logical volume for OSD ID {}'.format(osd_id), state='i')
try:
# 0. Check if an existsing logical volume exists
retcode, stdout, stderr = common.run_os_command(
'lvdisplay osd-db/osd{}'.format(osd_id)
)
if retcode != 5:
logger.out('Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id), state='e')
return False
# 1. Determine LV sizing
osd_db_size = int(osd_size_bytes * ext_db_ratio / 1024 / 1024)
# 2. Create the LV
logger.out('Creating DB LV "osd-db/osd-{}" of {}M ({} * {})'.format(osd_id, osd_db_size, osd_size_bytes, ext_db_ratio), state='i')
retcode, stdout, stderr = common.run_os_command(
'lvcreate --yes --name osd-{} --size {} osd-db'.format(osd_id, osd_db_size)
)
if retcode:
print('db lv creation')
print(stdout)
print(stderr)
raise Exception
# Log it
logger.out('Created new OSD database logical volume "osd-db/osd-{}"'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create OSD database logical volume: {}'.format(e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name):
@ -379,13 +511,15 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Adding a new OSD
if command == 'osd_add':
node, device, weight = args.split(',')
node, device, weight, ext_db_flag, ext_db_ratio = args.split(',')
ext_db_flag = bool(strtobool(ext_db_flag))
ext_db_ratio = float(ext_db_ratio)
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the OSD
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight)
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight, ext_db_flag, ext_db_ratio)
# Command succeeded
if result:
# Update the command queue
@ -426,3 +560,27 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new DB VG
elif command == 'db_vg_add':
node, device = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the VG
result = CephOSDInstance.add_db_vg(zkhandler, logger, device)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure={}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

View File

@ -21,7 +21,7 @@
import time
from threading import Thread
from threading import Thread, Event
import daemon_lib.common as common
@ -86,6 +86,7 @@ class NodeInstance(object):
self.storage_cidrnetmask = None
# Threads
self.flush_thread = None
self.flush_event = Event()
# Flags
self.flush_stopper = False
@ -159,8 +160,8 @@ class NodeInstance(object):
if self.flush_thread is not None:
self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i')
self.flush_stopper = True
while self.flush_stopper:
time.sleep(0.1)
self.flush_event.wait()
self.flush_event.clear()
# Do flushing in a thread so it doesn't block the migrates out
if self.domain_state == 'flush':
@ -679,6 +680,7 @@ class NodeInstance(object):
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node flush'.format(self.name), state='i')
self.flush_event.set()
self.flush_thread = None
self.flush_stopper = False
return
@ -711,6 +713,7 @@ class NodeInstance(object):
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
ticks = 0
self.logger.out('Waiting for migration of VM "{}"'.format(dom_uuid), state='i')
while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
ticks += 1
if ticks > 600:
@ -733,6 +736,7 @@ class NodeInstance(object):
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node unflush'.format(self.name), state='i')
self.flush_event.set()
self.flush_thread = None
self.flush_stopper = False
return
@ -766,8 +770,14 @@ class NodeInstance(object):
])
# Wait for the VM to migrate back
ticks = 0
self.logger.out('Waiting for migration of VM "{}"'.format(dom_uuid), state='i')
while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
time.sleep(0.1)
ticks += 1
if ticks > 600:
# Abort if we've waited for 120 seconds, the VM is messed and just continue
break
time.sleep(0.2)
self.zkhandler.write([
(('node.state.domain', self.name), 'ready')

View File

@ -67,6 +67,9 @@ class VMInstance(object):
self.inshutdown = False
self.instop = False
# State thread
self.state_thread = None
# Libvirt domuuid
self.dom = self.lookupByUUID(self.domuuid)
@ -83,8 +86,8 @@ class VMInstance(object):
# Perform a management command
self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i')
state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
state_thread.start()
self.state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
self.state_thread.start()
# Get data functions
def getstate(self):
@ -425,42 +428,12 @@ class VMInstance(object):
migrate_lock_node.acquire()
migrate_lock_state.acquire()
time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Don't try to migrate a node to itself, set back to start
if self.node == self.lastnode or self.node == self.this_node.name:
abort_migrate('Target node matches the current active node during initial check')
return
# Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
if self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid))
ticks = 0
while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
time.sleep(0.1)
ticks += 1
if ticks > 300:
self.logger.out('Timed out waiting 30s for peer', state='e', prefix='Domain {}'.format(self.domuuid))
aborted = True
break
self.logger.out('Releasing read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
if aborted:
abort_migrate('Timed out waiting for peer')
return
# Synchronize nodes B (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time for reader to acquire the lock
time.sleep(0.5) # Initial delay for the first writer to grab the lock
def migrate_live():
self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid))
@ -492,7 +465,6 @@ class VMInstance(object):
dest_lv_conn.close()
self.console_log_instance.stop()
self.removeDomainFromList()
return True
def migrate_shutdown():
@ -500,9 +472,15 @@ class VMInstance(object):
self.shutdown_vm()
return True
do_migrate_shutdown = False
self.logger.out('Acquiring lock for migration phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
try:
lock.acquire(timeout=30.0)
except Exception:
abort_migrate('Timed out waiting for peer')
return
self.logger.out('Acquired lock for migration phase B', state='o', prefix='Domain {}'.format(self.domuuid))
migrate_live_result = False
# Do a final verification
if self.node == self.lastnode or self.node == self.this_node.name:
abort_migrate('Target node matches the current active node during final check')
@ -510,7 +488,6 @@ class VMInstance(object):
if self.node != target_node:
abort_migrate('Target node changed during preparation')
return
if not force_shutdown:
# A live migrate is attemped 3 times in succession
ticks = 0
@ -525,59 +502,20 @@ class VMInstance(object):
break
else:
migrate_live_result = False
if not migrate_live_result:
if force_live:
self.logger.out('Could not live migrate VM while live migration enforced', state='e', prefix='Domain {}'.format(self.domuuid))
aborted = True
else:
do_migrate_shutdown = True
self.logger.out('Releasing write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
migrate_shutdown()
if aborted:
abort_migrate('Live migration failed and is required')
return
# Synchronize nodes C (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time for reader to acquire the lock
if do_migrate_shutdown:
migrate_shutdown()
self.logger.out('Releasing write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
# Wait for the receive side to complete before we declare all-done and release locks
ticks = 0
while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) != '':
time.sleep(0.1)
ticks += 1
if ticks > 100:
self.logger.out('Sync lock clear exceeded 10s timeout, continuing', state='w', prefix='Domain {}'.format(self.domuuid))
break
migrate_lock_node.release()
migrate_lock_state.release()
lock.release()
self.inmigrate = False
return
@ -592,55 +530,29 @@ class VMInstance(object):
self.logger.out('Receiving VM migration from node "{}"'.format(self.last_currentnode), state='i', prefix='Domain {}'.format(self.domuuid))
# Short delay to ensure sender is in sync
time.sleep(0.5)
# Ensure our lock key is populated
self.zkhandler.write([
(('domain.migrate.sync_lock', self.domuuid), self.domuuid)
])
# Synchronize nodes A (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(1) # Time for reader to acquire the lock
self.logger.out('Releasing write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Acquiring lock for migration phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
try:
lock.acquire(timeout=30.0)
except Exception:
self.logger.out('Failed to acquire exclusive lock for VM', state='w')
return
self.logger.out('Acquired lock for migration phase A', state='o', prefix='Domain {}'.format(self.domuuid))
# Exactly twice the amount of time that the other side is waiting
time.sleep(1)
lock.release()
self.logger.out('Released write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Acquiring lock for phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Acquired lock for migration phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Set the updated data
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time for reader to acquire the lock
self.state = self.zkhandler.read(('domain.state', self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
if self.dom:
@ -672,10 +584,7 @@ class VMInstance(object):
else:
# The send failed or was aborted
self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
self.zkhandler.write([
(('domain.migrate.sync_lock', self.domuuid), '')
@ -795,6 +704,9 @@ class VMInstance(object):
else:
self.terminate_vm()
self.state_thread = None
return
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
# 1. Takes a text UUID and handles converting it to bytes
# 2. Try's it and returns a sensible value if not

View File

@ -56,8 +56,15 @@ def fence_node(node_name, zkhandler, config, logger):
# Shoot it in the head
fence_status = reboot_via_ipmi(ipmi_hostname, ipmi_username, ipmi_password, logger)
# Hold to ensure the fence takes effect and system stabilizes
time.sleep(config['keepalive_interval'] * 2)
logger.out('Waiting {}s for fence of node "{}" to take effect'.format(config['keepalive_interval'], node_name), state='i')
time.sleep(config['keepalive_interval'])
if fence_status:
logger.out('Marking node "{}" as fenced'.format(node_name), state='i')
zkhandler.write([
(('node.state.daemon', node_name), 'fenced')
])
# Force into secondary network state if needed
if node_name in config['coordinators']:
@ -119,6 +126,7 @@ def migrateFromFencedNode(zkhandler, node_name, config, logger):
zkhandler.write([
(('node.state.domain', node_name), 'flushed')
])
logger.out('All VMs flushed from dead node "{}" to new hosts'.format(node_name), state='i')
#
@ -152,20 +160,20 @@ def reboot_via_ipmi(ipmi_hostname, ipmi_user, ipmi_password, logger):
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(ipmi_command_status)
if ipmi_reset_retcode == 0:
if ipmi_status_stdout == "Chassis Power is on":
if ipmi_status_stdout.strip() == "Chassis Power is on":
# We successfully rebooted the node and it is powered on; this is a succeessful fence
logger.out('Successfully rebooted dead node', state='o')
return True
elif ipmi_status_stdout == "Chassis Power is off":
elif ipmi_status_stdout.strip() == "Chassis Power is off":
# We successfully rebooted the node but it is powered off; this might be expected or not, but the node is confirmed off so we can call it a successful fence
logger.out('Chassis power is in confirmed off state after successfuly IPMI reboot; proceeding with fence-flush', state='o')
return True
else:
# We successfully rebooted the node but it is in some unknown power state; since this might indicate a silent failure, we must call it a failed fence
logger.out('Chassis power is in an unknown state after successful IPMI reboot; not performing fence-flush', state='e')
logger.out('Chassis power is in an unknown state ({}) after successful IPMI reboot; not performing fence-flush'.format(ipmi_status_stdout.strip()), state='e')
return False
else:
if ipmi_status_stdout == "Chassis Power is off":
if ipmi_status_stdout.strip() == "Chassis Power is off":
# We failed to reboot the node but it is powered off; it has probably suffered a serious hardware failure, but the node is confirmed off so we can call it a successful fence
logger.out('Chassis power is in confirmed off state after failed IPMI reboot; proceeding with fence-flush', state='o')
return True
@ -181,7 +189,7 @@ def reboot_via_ipmi(ipmi_hostname, ipmi_user, ipmi_password, logger):
def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password):
ipmi_command = f'/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status'
retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2)
if retcode == 0 and stdout != "Chassis Power is on":
if retcode == 0 and stdout.strip() != "Chassis Power is on":
return True
else:
return False

View File

@ -361,9 +361,13 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state='d', prefix='vm-thread')
try:
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
raise Exception
except Exception:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return
memalloc = 0
memprov = 0
@ -588,23 +592,23 @@ def node_keepalive(logger, config, zkhandler, this_node):
# Join against running threads
if config['enable_hypervisor']:
vm_stats_thread.join(timeout=4.0)
vm_stats_thread.join(timeout=config['keepalive_interval'])
if vm_stats_thread.is_alive():
logger.out('VM stats gathering exceeded 4s timeout, continuing', state='w')
logger.out('VM stats gathering exceeded timeout, continuing', state='w')
if config['enable_storage']:
ceph_stats_thread.join(timeout=4.0)
ceph_stats_thread.join(timeout=config['keepalive_interval'])
if ceph_stats_thread.is_alive():
logger.out('Ceph stats gathering exceeded 4s timeout, continuing', state='w')
logger.out('Ceph stats gathering exceeded timeout, continuing', state='w')
# Get information from thread queues
if config['enable_hypervisor']:
try:
this_node.domains_count = vm_thread_queue.get()
this_node.memalloc = vm_thread_queue.get()
this_node.memprov = vm_thread_queue.get()
this_node.vcpualloc = vm_thread_queue.get()
this_node.domains_count = vm_thread_queue.get(timeout=config['keepalive_interval'])
this_node.memalloc = vm_thread_queue.get(timeout=config['keepalive_interval'])
this_node.memprov = vm_thread_queue.get(timeout=config['keepalive_interval'])
this_node.vcpualloc = vm_thread_queue.get(timeout=config['keepalive_interval'])
except Exception:
pass
logger.out('VM stats queue get exceeded timeout, continuing', state='w')
else:
this_node.domains_count = 0
this_node.memalloc = 0
@ -613,10 +617,11 @@ def node_keepalive(logger, config, zkhandler, this_node):
if config['enable_storage']:
try:
ceph_health_colour = ceph_thread_queue.get()
ceph_health = ceph_thread_queue.get()
osds_this_node = ceph_thread_queue.get()
ceph_health_colour = ceph_thread_queue.get(timeout=config['keepalive_interval'])
ceph_health = ceph_thread_queue.get(timeout=config['keepalive_interval'])
osds_this_node = ceph_thread_queue.get(timeout=config['keepalive_interval'])
except Exception:
logger.out('Ceph stats queue get exceeded timeout, continuing', state='w')
ceph_health_colour = logger.fmt_cyan
ceph_health = 'UNKNOWN'
osds_this_node = '?'