Compare commits
25 Commits
Author | SHA1 | Date | |
---|---|---|---|
23977b04fc | |||
bb1cca522f | |||
9a4dce4e4c | |||
f6f6f07488 | |||
142c999ce8 | |||
1de069298c | |||
55221b3d97 | |||
0d72798814 | |||
3638efc77e | |||
c2c888d684 | |||
febef2e406 | |||
2a4f38e933 | |||
3b805cdc34 | |||
06f0f7ed91 | |||
fd040ab45a | |||
e23e2dd9bf | |||
ee4266f8ca | |||
0f02c5eaef | |||
075abec5fe | |||
3a1cbf8d01 | |||
a438a4155a | |||
65df807b09 | |||
d0f3e9e285 | |||
adc8a5a3bc | |||
df277edf1c |
17
README.md
17
README.md
@ -55,6 +55,23 @@ While PVC's API and internals aren't very screenshot-worthy, here is some exampl
|
||||
|
||||
## Changelog
|
||||
|
||||
#### v0.9.37
|
||||
|
||||
* [All] Adds support for configurable OSD DB size ratios
|
||||
* [Node Daemon] Fixes bugs with OSD creation
|
||||
* [Node Daemon] Fixes exception bugs in CephInstance
|
||||
* [CLI Client] Adjusts descriptions around Ceph OSDs
|
||||
* [Node Daemon] Fixes ordering of pvc-flush unit
|
||||
* [Node Daemon] Fixes bugs in fence handling and libvirt keepalive
|
||||
* [Node Daemon] Simplifies locking for and speeds up VM migrations
|
||||
* [Node Daemon] Fixes bugs in queue get timeouts
|
||||
* [API Daemon] Adjusts benchmark test jobs configuration and naming
|
||||
|
||||
#### v0.9.36
|
||||
|
||||
* [Node Daemon] Fixes a bug during early cleanup
|
||||
* [All] Adds support for OSD database/WAL block devices to improve Ceph performance; NOTE: Applies only to new OSDs
|
||||
|
||||
#### v0.9.35
|
||||
|
||||
* [Node Daemon] Fixes several bugs and crashes in node daemon
|
||||
|
@ -25,7 +25,7 @@ import yaml
|
||||
from distutils.util import strtobool as dustrtobool
|
||||
|
||||
# Daemon version
|
||||
version = '0.9.35'
|
||||
version = '0.9.37'
|
||||
|
||||
# API version
|
||||
API_VERSION = 1.0
|
||||
|
@ -37,12 +37,12 @@ class BenchmarkError(Exception):
|
||||
"""
|
||||
An exception that results from the Benchmark job.
|
||||
"""
|
||||
def __init__(self, message, cur_time=None, db_conn=None, db_cur=None, zkhandler=None):
|
||||
def __init__(self, message, job_name=None, db_conn=None, db_cur=None, zkhandler=None):
|
||||
self.message = message
|
||||
if cur_time is not None:
|
||||
if job_name is not None:
|
||||
# Clean up our dangling result
|
||||
query = "DELETE FROM storage_benchmarks WHERE job = %s;"
|
||||
args = (cur_time,)
|
||||
args = (job_name,)
|
||||
db_cur.execute(query, args)
|
||||
db_conn.commit()
|
||||
# Close the database connections cleanly
|
||||
@ -111,10 +111,6 @@ def run_benchmark(self, pool):
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
cur_time = datetime.now().isoformat(timespec='seconds')
|
||||
|
||||
print("Starting storage benchmark '{}' on pool '{}'".format(cur_time, pool))
|
||||
|
||||
# Phase 0 - connect to databases
|
||||
try:
|
||||
db_conn, db_cur = open_database(config)
|
||||
@ -129,14 +125,20 @@ def run_benchmark(self, pool):
|
||||
print('FATAL - failed to connect to Zookeeper')
|
||||
raise Exception
|
||||
|
||||
print("Storing running status for job '{}' in database".format(cur_time))
|
||||
cur_time = datetime.now().isoformat(timespec='seconds')
|
||||
cur_primary = zkhandler.read('base.config.primary_node')
|
||||
job_name = '{}_{}'.format(cur_time, cur_primary)
|
||||
|
||||
print("Starting storage benchmark '{}' on pool '{}'".format(job_name, pool))
|
||||
|
||||
print("Storing running status for job '{}' in database".format(job_name))
|
||||
try:
|
||||
query = "INSERT INTO storage_benchmarks (job, result) VALUES (%s, %s);"
|
||||
args = (cur_time, "Running",)
|
||||
args = (job_name, "Running",)
|
||||
db_cur.execute(query, args)
|
||||
db_conn.commit()
|
||||
except Exception as e:
|
||||
raise BenchmarkError("Failed to store running status: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
raise BenchmarkError("Failed to store running status: {}".format(e), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
|
||||
# Phase 1 - volume preparation
|
||||
self.update_state(state='RUNNING', meta={'current': 1, 'total': 3, 'status': 'Creating benchmark volume'})
|
||||
@ -147,7 +149,7 @@ def run_benchmark(self, pool):
|
||||
# Create the RBD volume
|
||||
retcode, retmsg = pvc_ceph.add_volume(zkhandler, pool, volume, "8G")
|
||||
if not retcode:
|
||||
raise BenchmarkError('Failed to create volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
raise BenchmarkError('Failed to create volume "{}": {}'.format(volume, retmsg), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
else:
|
||||
print(retmsg)
|
||||
|
||||
@ -169,72 +171,83 @@ def run_benchmark(self, pool):
|
||||
test_matrix = {
|
||||
'seq_read': {
|
||||
'direction': 'read',
|
||||
'iodepth': '64',
|
||||
'bs': '4M',
|
||||
'rw': 'read'
|
||||
},
|
||||
'seq_write': {
|
||||
'direction': 'write',
|
||||
'iodepth': '64',
|
||||
'bs': '4M',
|
||||
'rw': 'write'
|
||||
},
|
||||
'rand_read_4M': {
|
||||
'direction': 'read',
|
||||
'iodepth': '64',
|
||||
'bs': '4M',
|
||||
'rw': 'randread'
|
||||
},
|
||||
'rand_write_4M': {
|
||||
'direction': 'write',
|
||||
'iodepth': '64',
|
||||
'bs': '4M',
|
||||
'rw': 'randwrite'
|
||||
},
|
||||
'rand_read_256K': {
|
||||
'direction': 'read',
|
||||
'bs': '256K',
|
||||
'rw': 'randread'
|
||||
},
|
||||
'rand_write_256K': {
|
||||
'direction': 'write',
|
||||
'bs': '256K',
|
||||
'rw': 'randwrite'
|
||||
},
|
||||
'rand_read_4K': {
|
||||
'direction': 'read',
|
||||
'iodepth': '64',
|
||||
'bs': '4K',
|
||||
'rw': 'randread'
|
||||
},
|
||||
'rand_write_4K': {
|
||||
'direction': 'write',
|
||||
'iodepth': '64',
|
||||
'bs': '4K',
|
||||
'rw': 'randwrite'
|
||||
}
|
||||
},
|
||||
'rand_read_4K_lowdepth': {
|
||||
'direction': 'read',
|
||||
'iodepth': '1',
|
||||
'bs': '4K',
|
||||
'rw': 'randread'
|
||||
},
|
||||
'rand_write_4K_lowdepth': {
|
||||
'direction': 'write',
|
||||
'iodepth': '1',
|
||||
'bs': '4K',
|
||||
'rw': 'randwrite'
|
||||
},
|
||||
}
|
||||
parsed_results = dict()
|
||||
for test in test_matrix:
|
||||
print("Running test '{}'".format(test))
|
||||
fio_cmd = """
|
||||
fio \
|
||||
--output-format=terse \
|
||||
--terse-version=5 \
|
||||
--name={test} \
|
||||
--ioengine=rbd \
|
||||
--pool={pool} \
|
||||
--rbdname={volume} \
|
||||
--output-format=terse \
|
||||
--terse-version=5 \
|
||||
--direct=1 \
|
||||
--randrepeat=1 \
|
||||
--iodepth=64 \
|
||||
--size=8G \
|
||||
--name={test} \
|
||||
--iodepth={iodepth} \
|
||||
--numjobs=1 \
|
||||
--time_based \
|
||||
--runtime=60 \
|
||||
--bs={bs} \
|
||||
--readwrite={rw}
|
||||
""".format(
|
||||
test=test,
|
||||
pool=pool,
|
||||
volume=volume,
|
||||
test=test,
|
||||
iodepth=test_matrix[test]['iodepth'],
|
||||
bs=test_matrix[test]['bs'],
|
||||
rw=test_matrix[test]['rw'])
|
||||
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(fio_cmd)
|
||||
if retcode:
|
||||
raise BenchmarkError("Failed to run fio test: {}".format(stderr), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
raise BenchmarkError("Failed to run fio test: {}".format(stderr), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
|
||||
# Parse the terse results to avoid storing tons of junk
|
||||
# Reference: https://fio.readthedocs.io/en/latest/fio_doc.html#terse-output
|
||||
@ -437,18 +450,18 @@ def run_benchmark(self, pool):
|
||||
# Remove the RBD volume
|
||||
retcode, retmsg = pvc_ceph.remove_volume(zkhandler, pool, volume)
|
||||
if not retcode:
|
||||
raise BenchmarkError('Failed to remove volume "{}": {}'.format(volume, retmsg), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
raise BenchmarkError('Failed to remove volume "{}": {}'.format(volume, retmsg), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
else:
|
||||
print(retmsg)
|
||||
|
||||
print("Storing result of tests for job '{}' in database".format(cur_time))
|
||||
print("Storing result of tests for job '{}' in database".format(job_name))
|
||||
try:
|
||||
query = "UPDATE storage_benchmarks SET result = %s WHERE job = %s;"
|
||||
args = (json.dumps(parsed_results), cur_time)
|
||||
args = (json.dumps(parsed_results), job_name)
|
||||
db_cur.execute(query, args)
|
||||
db_conn.commit()
|
||||
except Exception as e:
|
||||
raise BenchmarkError("Failed to store test results: {}".format(e), cur_time=cur_time, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
raise BenchmarkError("Failed to store test results: {}".format(e), job_name=job_name, db_conn=db_conn, db_cur=db_cur, zkhandler=zkhandler)
|
||||
|
||||
close_database(db_conn, db_cur)
|
||||
zkhandler.disconnect()
|
||||
|
@ -3599,6 +3599,52 @@ class API_Storage_Ceph_Option(Resource):
|
||||
api.add_resource(API_Storage_Ceph_Option, '/storage/ceph/option')
|
||||
|
||||
|
||||
# /storage/ceph/osddb
|
||||
class API_Storage_Ceph_OSDDB_Root(Resource):
|
||||
@RequestParser([
|
||||
{'name': 'node', 'required': True, 'helptext': "A valid node must be specified."},
|
||||
{'name': 'device', 'required': True, 'helptext': "A valid device must be specified."},
|
||||
])
|
||||
@Authenticator
|
||||
def post(self, reqargs):
|
||||
"""
|
||||
Add a Ceph OSD database volume group to the cluster
|
||||
Note: This task may take up to 30s to complete and return
|
||||
---
|
||||
tags:
|
||||
- storage / ceph
|
||||
parameters:
|
||||
- in: query
|
||||
name: node
|
||||
type: string
|
||||
required: true
|
||||
description: The PVC node to create the OSD DB volume group on
|
||||
- in: query
|
||||
name: device
|
||||
type: string
|
||||
required: true
|
||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) to create the OSD DB volume group on
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
400:
|
||||
description: Bad request
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
return api_helper.ceph_osd_db_vg_add(
|
||||
reqargs.get('node', None),
|
||||
reqargs.get('device', None)
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_Storage_Ceph_OSDDB_Root, '/storage/ceph/osddb')
|
||||
|
||||
|
||||
# /storage/ceph/osd
|
||||
class API_Storage_Ceph_OSD_Root(Resource):
|
||||
@RequestParser([
|
||||
@ -3619,6 +3665,12 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
id:
|
||||
type: string (containing integer)
|
||||
description: The Ceph ID of the OSD
|
||||
device:
|
||||
type: string
|
||||
description: The OSD data block device
|
||||
db_device:
|
||||
type: string
|
||||
description: The OSD database/WAL block device (logical volume); empty if not applicable
|
||||
stats:
|
||||
type: object
|
||||
properties:
|
||||
@ -3698,6 +3750,8 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
{'name': 'node', 'required': True, 'helptext': "A valid node must be specified."},
|
||||
{'name': 'device', 'required': True, 'helptext': "A valid device must be specified."},
|
||||
{'name': 'weight', 'required': True, 'helptext': "An OSD weight must be specified."},
|
||||
{'name': 'ext_db', 'required': False, 'helptext': "Whether to use an external OSD DB LV device."},
|
||||
{'name': 'ext_db_ratio', 'required': False, 'helptext': "Decimal size ratio of the external OSD DB LV device."},
|
||||
])
|
||||
@Authenticator
|
||||
def post(self, reqargs):
|
||||
@ -3723,6 +3777,16 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
type: number
|
||||
required: true
|
||||
description: The Ceph CRUSH weight for the OSD
|
||||
- in: query
|
||||
name: ext_db
|
||||
type: boolean
|
||||
required: false
|
||||
description: Whether to use an external OSD DB LV device
|
||||
- in: query
|
||||
name: ext_db_ratio
|
||||
type: float
|
||||
required: false
|
||||
description: Decimal ratio of total OSD size for the external OSD DB LV device, default 0.05 (5%)
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
@ -3738,7 +3802,9 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
return api_helper.ceph_osd_add(
|
||||
reqargs.get('node', None),
|
||||
reqargs.get('device', None),
|
||||
reqargs.get('weight', None)
|
||||
reqargs.get('weight', None),
|
||||
reqargs.get('ext_db', False),
|
||||
float(reqargs.get('ext_db_ratio', 0.05)),
|
||||
)
|
||||
|
||||
|
||||
|
@ -1274,11 +1274,29 @@ def ceph_osd_state(zkhandler, osd):
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def ceph_osd_add(zkhandler, node, device, weight):
|
||||
def ceph_osd_db_vg_add(zkhandler, node, device):
|
||||
"""
|
||||
Add a Ceph OSD database VG to the PVC Ceph storage cluster.
|
||||
"""
|
||||
retflag, retdata = pvc_ceph.add_osd_db_vg(zkhandler, node, device)
|
||||
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
retcode = 400
|
||||
|
||||
output = {
|
||||
'message': retdata.replace('\"', '\'')
|
||||
}
|
||||
return output, retcode
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def ceph_osd_add(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
|
||||
"""
|
||||
Add a Ceph OSD to the PVC Ceph storage cluster.
|
||||
"""
|
||||
retflag, retdata = pvc_ceph.add_osd(zkhandler, node, device, weight)
|
||||
retflag, retdata = pvc_ceph.add_osd(zkhandler, node, device, weight, ext_db_flag, ext_db_ratio)
|
||||
|
||||
if retflag:
|
||||
retcode = 200
|
||||
|
@ -149,6 +149,31 @@ def format_raw_output(status_data):
|
||||
return '\n'.join(ainformation)
|
||||
|
||||
|
||||
#
|
||||
# OSD DB VG functions
|
||||
#
|
||||
def ceph_osd_db_vg_add(config, node, device):
|
||||
"""
|
||||
Add new Ceph OSD database volume group
|
||||
|
||||
API endpoint: POST /api/v1/storage/ceph/osddb
|
||||
API arguments: node={node}, device={device}
|
||||
API schema: {"message":"{data}"}
|
||||
"""
|
||||
params = {
|
||||
'node': node,
|
||||
'device': device
|
||||
}
|
||||
response = call_api(config, 'post', '/storage/ceph/osddb', params=params)
|
||||
|
||||
if response.status_code == 200:
|
||||
retstatus = True
|
||||
else:
|
||||
retstatus = False
|
||||
|
||||
return retstatus, response.json().get('message', '')
|
||||
|
||||
|
||||
#
|
||||
# OSD functions
|
||||
#
|
||||
@ -197,18 +222,20 @@ def ceph_osd_list(config, limit):
|
||||
return False, response.json().get('message', '')
|
||||
|
||||
|
||||
def ceph_osd_add(config, node, device, weight):
|
||||
def ceph_osd_add(config, node, device, weight, ext_db_flag, ext_db_ratio):
|
||||
"""
|
||||
Add new Ceph OSD
|
||||
|
||||
API endpoint: POST /api/v1/storage/ceph/osd
|
||||
API arguments: node={node}, device={device}, weight={weight}
|
||||
API arguments: node={node}, device={device}, weight={weight}, ext_db={ext_db_flag}, ext_db_ratio={ext_db_ratio}
|
||||
API schema: {"message":"{data}"}
|
||||
"""
|
||||
params = {
|
||||
'node': node,
|
||||
'device': device,
|
||||
'weight': weight
|
||||
'weight': weight,
|
||||
'ext_db': ext_db_flag,
|
||||
'ext_db_ratio': ext_db_ratio
|
||||
}
|
||||
response = call_api(config, 'post', '/storage/ceph/osd', params=params)
|
||||
|
||||
@ -312,13 +339,15 @@ def format_list_osd(osd_list):
|
||||
osd_list_output = []
|
||||
|
||||
osd_id_length = 3
|
||||
osd_node_length = 5
|
||||
osd_device_length = 6
|
||||
osd_db_device_length = 9
|
||||
osd_up_length = 4
|
||||
osd_in_length = 4
|
||||
osd_size_length = 5
|
||||
osd_weight_length = 3
|
||||
osd_reweight_length = 5
|
||||
osd_pgs_length = 4
|
||||
osd_node_length = 5
|
||||
osd_used_length = 5
|
||||
osd_free_length = 6
|
||||
osd_util_length = 6
|
||||
@ -358,10 +387,21 @@ def format_list_osd(osd_list):
|
||||
if _osd_id_length > osd_id_length:
|
||||
osd_id_length = _osd_id_length
|
||||
|
||||
# Set the OSD node length
|
||||
_osd_node_length = len(osd_information['stats']['node']) + 1
|
||||
if _osd_node_length > osd_node_length:
|
||||
osd_node_length = _osd_node_length
|
||||
|
||||
# Set the OSD device length
|
||||
_osd_device_length = len(osd_information['device']) + 1
|
||||
if _osd_device_length > osd_device_length:
|
||||
osd_device_length = _osd_device_length
|
||||
|
||||
# Set the OSD db_device length
|
||||
_osd_db_device_length = len(osd_information['db_device']) + 1
|
||||
if _osd_db_device_length > osd_db_device_length:
|
||||
osd_db_device_length = _osd_db_device_length
|
||||
|
||||
# Set the size and length
|
||||
_osd_size_length = len(str(osd_information['stats']['size'])) + 1
|
||||
if _osd_size_length > osd_size_length:
|
||||
@ -422,12 +462,12 @@ def format_list_osd(osd_list):
|
||||
osd_list_output.append('{bold}{osd_header: <{osd_header_length}} {state_header: <{state_header_length}} {details_header: <{details_header_length}} {read_header: <{read_header_length}} {write_header: <{write_header_length}}{end_bold}'.format(
|
||||
bold=ansiprint.bold(),
|
||||
end_bold=ansiprint.end(),
|
||||
osd_header_length=osd_id_length + osd_node_length + 1,
|
||||
osd_header_length=osd_id_length + osd_node_length + osd_device_length + osd_db_device_length + 3,
|
||||
state_header_length=osd_up_length + osd_in_length + 1,
|
||||
details_header_length=osd_size_length + osd_pgs_length + osd_weight_length + osd_reweight_length + osd_used_length + osd_free_length + osd_util_length + osd_var_length + 7,
|
||||
read_header_length=osd_rdops_length + osd_rddata_length + 1,
|
||||
write_header_length=osd_wrops_length + osd_wrdata_length + 1,
|
||||
osd_header='OSDs ' + ''.join(['-' for _ in range(5, osd_id_length + osd_node_length)]),
|
||||
osd_header='OSDs ' + ''.join(['-' for _ in range(5, osd_id_length + osd_node_length + osd_device_length + osd_db_device_length + 2)]),
|
||||
state_header='State ' + ''.join(['-' for _ in range(6, osd_up_length + osd_in_length)]),
|
||||
details_header='Details ' + ''.join(['-' for _ in range(8, osd_size_length + osd_pgs_length + osd_weight_length + osd_reweight_length + osd_used_length + osd_free_length + osd_util_length + osd_var_length + 6)]),
|
||||
read_header='Read ' + ''.join(['-' for _ in range(5, osd_rdops_length + osd_rddata_length)]),
|
||||
@ -437,6 +477,8 @@ def format_list_osd(osd_list):
|
||||
osd_list_output.append('{bold}\
|
||||
{osd_id: <{osd_id_length}} \
|
||||
{osd_node: <{osd_node_length}} \
|
||||
{osd_device: <{osd_device_length}} \
|
||||
{osd_db_device: <{osd_db_device_length}} \
|
||||
{osd_up: <{osd_up_length}} \
|
||||
{osd_in: <{osd_in_length}} \
|
||||
{osd_size: <{osd_size_length}} \
|
||||
@ -456,6 +498,8 @@ def format_list_osd(osd_list):
|
||||
end_bold=ansiprint.end(),
|
||||
osd_id_length=osd_id_length,
|
||||
osd_node_length=osd_node_length,
|
||||
osd_device_length=osd_device_length,
|
||||
osd_db_device_length=osd_db_device_length,
|
||||
osd_up_length=osd_up_length,
|
||||
osd_in_length=osd_in_length,
|
||||
osd_size_length=osd_size_length,
|
||||
@ -472,6 +516,8 @@ def format_list_osd(osd_list):
|
||||
osd_rddata_length=osd_rddata_length,
|
||||
osd_id='ID',
|
||||
osd_node='Node',
|
||||
osd_device='Block',
|
||||
osd_db_device='DB Block',
|
||||
osd_up='Up',
|
||||
osd_in='In',
|
||||
osd_size='Size',
|
||||
@ -500,10 +546,16 @@ def format_list_osd(osd_list):
|
||||
osd_util = round(osd_information['stats']['utilization'], 2)
|
||||
osd_var = round(osd_information['stats']['var'], 2)
|
||||
|
||||
osd_db_device = osd_information['db_device']
|
||||
if not osd_db_device:
|
||||
osd_db_device = 'N/A'
|
||||
|
||||
# Format the output header
|
||||
osd_list_output.append('{bold}\
|
||||
{osd_id: <{osd_id_length}} \
|
||||
{osd_node: <{osd_node_length}} \
|
||||
{osd_device: <{osd_device_length}} \
|
||||
{osd_db_device: <{osd_db_device_length}} \
|
||||
{osd_up_colour}{osd_up: <{osd_up_length}}{end_colour} \
|
||||
{osd_in_colour}{osd_in: <{osd_in_length}}{end_colour} \
|
||||
{osd_size: <{osd_size_length}} \
|
||||
@ -524,6 +576,8 @@ def format_list_osd(osd_list):
|
||||
end_colour=ansiprint.end(),
|
||||
osd_id_length=osd_id_length,
|
||||
osd_node_length=osd_node_length,
|
||||
osd_device_length=osd_device_length,
|
||||
osd_db_device_length=osd_db_device_length,
|
||||
osd_up_length=osd_up_length,
|
||||
osd_in_length=osd_in_length,
|
||||
osd_size_length=osd_size_length,
|
||||
@ -540,6 +594,8 @@ def format_list_osd(osd_list):
|
||||
osd_rddata_length=osd_rddata_length,
|
||||
osd_id=osd_information['id'],
|
||||
osd_node=osd_information['stats']['node'],
|
||||
osd_device=osd_information['device'],
|
||||
osd_db_device=osd_db_device,
|
||||
osd_up_colour=osd_up_colour,
|
||||
osd_up=osd_up_flag,
|
||||
osd_in_colour=osd_in_colour,
|
||||
@ -1550,10 +1606,10 @@ def format_info_benchmark(config, benchmark_information):
|
||||
"seq_write": "Sequential Write (4M blocks)",
|
||||
"rand_read_4M": "Random Read (4M blocks)",
|
||||
"rand_write_4M": "Random Write (4M blocks)",
|
||||
"rand_read_256K": "Random Read (256K blocks)",
|
||||
"rand_write_256K": "Random Write (256K blocks)",
|
||||
"rand_read_4K": "Random Read (4K blocks)",
|
||||
"rand_write_4K": "Random Write (4K blocks)"
|
||||
"rand_write_4K": "Random Write (4K blocks)",
|
||||
"rand_read_4K_lowdepth": "Random Read (4K blocks, single-queue)",
|
||||
"rand_write_4K_lowdepth": "Random Write (4K blocks, single-queue)",
|
||||
}
|
||||
|
||||
test_name_length = 30
|
||||
@ -1566,7 +1622,16 @@ def format_info_benchmark(config, benchmark_information):
|
||||
cpuutil_label_length = 11
|
||||
cpuutil_column_length = 9
|
||||
|
||||
# Work around old results that did not have these tests
|
||||
if 'rand_read_4K_lowdepth' not in benchmark_details:
|
||||
del nice_test_name_map['rand_read_4K_lowdepth']
|
||||
del nice_test_name_map['rand_write_4K_lowdepth']
|
||||
|
||||
for test in benchmark_details:
|
||||
# Work around old results that had these obsolete tests
|
||||
if test == 'rand_read_256K' or test == 'rand_write_256K':
|
||||
continue
|
||||
|
||||
_test_name_length = len(nice_test_name_map[test])
|
||||
if _test_name_length > test_name_length:
|
||||
test_name_length = _test_name_length
|
||||
@ -1603,6 +1668,10 @@ def format_info_benchmark(config, benchmark_information):
|
||||
cpuutil_column_length = _element_length
|
||||
|
||||
for test in benchmark_details:
|
||||
# Work around old results that had these obsolete tests
|
||||
if test == 'rand_read_256K' or test == 'rand_write_256K':
|
||||
continue
|
||||
|
||||
ainformation.append('')
|
||||
|
||||
test_details = benchmark_details[test]
|
||||
|
@ -2526,7 +2526,7 @@ def ceph_benchmark_run(pool):
|
||||
Run a storage benchmark on POOL in the background.
|
||||
"""
|
||||
try:
|
||||
click.confirm('NOTE: Storage benchmarks generate significant load on the cluster and can take a very long time to complete on slow storage. They should be run sparingly. Continue', prompt_suffix='? ', abort=True)
|
||||
click.confirm('NOTE: Storage benchmarks take approximately 8 minutes to run and generate significant load on the storage cluster; they should be run sparingly. Continue', prompt_suffix='? ', abort=True)
|
||||
except Exception:
|
||||
exit(0)
|
||||
|
||||
@ -2583,6 +2583,38 @@ def ceph_osd():
|
||||
pass
|
||||
|
||||
|
||||
###############################################################################
|
||||
# pvc storage osd create-db-vg
|
||||
###############################################################################
|
||||
@click.command(name='create-db-vg', short_help='Create new OSD database volume group.')
|
||||
@click.argument(
|
||||
'node'
|
||||
)
|
||||
@click.argument(
|
||||
'device'
|
||||
)
|
||||
@click.option(
|
||||
'-y', '--yes', 'confirm_flag',
|
||||
is_flag=True, default=False,
|
||||
help='Confirm the creation.'
|
||||
)
|
||||
@cluster_req
|
||||
def ceph_osd_create_db_vg(node, device, confirm_flag):
|
||||
"""
|
||||
Create a new Ceph OSD database volume group on node NODE with block device DEVICE. DEVICE must be a valid raw block device, one of e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...', etc. Using partitions is not supported.
|
||||
|
||||
This volume group will be used for Ceph OSD database and WAL functionality if the '--ext-db' flag is passed to newly-created OSDs during 'pvc storage osd add'. DEVICE should be an extremely fast SSD device (NVMe, Intel Optane, etc.) which is significantly faster than the normal OSD disks and with very high write endurance. Only one OSD database volume group on a single physical device is supported per node, so it must be fast and large enough to act as an effective OSD database device for all OSDs on the node. Attempting to add additional database volume groups after the first will fail.
|
||||
"""
|
||||
if not confirm_flag and not config['unsafe']:
|
||||
try:
|
||||
click.confirm('Destroy all data and create a new OSD database volume group on {}:{}'.format(node, device), prompt_suffix='? ', abort=True)
|
||||
except Exception:
|
||||
exit(0)
|
||||
|
||||
retcode, retmsg = pvc_ceph.ceph_osd_db_vg_add(config, node, device)
|
||||
cleanup(retcode, retmsg)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# pvc storage osd add
|
||||
###############################################################################
|
||||
@ -2598,15 +2630,31 @@ def ceph_osd():
|
||||
default=1.0, show_default=True,
|
||||
help='Weight of the OSD within the CRUSH map.'
|
||||
)
|
||||
@click.option(
|
||||
'-d', '--ext-db', 'ext_db_flag',
|
||||
is_flag=True, default=False,
|
||||
help='Use an external database logical volume for this OSD.'
|
||||
)
|
||||
@click.option(
|
||||
'-r', '--ext-db-ratio', 'ext_db_ratio',
|
||||
default=0.05, show_default=True, type=float,
|
||||
help='Decimal ratio of the external database logical volume to the OSD size.'
|
||||
)
|
||||
@click.option(
|
||||
'-y', '--yes', 'confirm_flag',
|
||||
is_flag=True, default=False,
|
||||
help='Confirm the removal'
|
||||
help='Confirm the creation.'
|
||||
)
|
||||
@cluster_req
|
||||
def ceph_osd_add(node, device, weight, confirm_flag):
|
||||
def ceph_osd_add(node, device, weight, ext_db_flag, ext_db_ratio, confirm_flag):
|
||||
"""
|
||||
Add a new Ceph OSD on node NODE with block device DEVICE.
|
||||
Add a new Ceph OSD on node NODE with block device DEVICE. DEVICE must be a valid raw block device, one of e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...', etc. Using partitions is not supported.
|
||||
|
||||
The weight of an OSD should reflect the ratio of the OSD to other OSDs in the storage cluster. For example, if all OSDs are the same size as recommended for PVC, 1 (the default) is a valid weight so that all are treated identically. If a new OSD is added later which is 4x the size of the existing OSDs, the new OSD's weight should then be 4 to tell the cluster that 4x the data can be stored on the OSD. Weights can also be tweaked for performance reasons, since OSDs with more data will incur more I/O load. For more information about CRUSH weights, please see the Ceph documentation.
|
||||
|
||||
If '--ext-db' is specified, the OSD database and WAL will be placed on a new logical volume in NODE's OSD database volume group; it must exist or OSD creation will fail. See the 'pvc storage osd create-db-vg' command for more details.
|
||||
|
||||
The default '--ext-db-ratio' of 0.05 (5%) is sufficient for most RBD workloads and OSD sizes, though this can be adjusted based on the sizes of the OSD(s) and the underlying database device. Ceph documentation recommends at least 0.02 (2%) for RBD use-cases, and higher values may improve WAL performance under write-heavy workloads with fewer OSDs per node.
|
||||
"""
|
||||
if not confirm_flag and not config['unsafe']:
|
||||
try:
|
||||
@ -2614,7 +2662,7 @@ def ceph_osd_add(node, device, weight, confirm_flag):
|
||||
except Exception:
|
||||
exit(0)
|
||||
|
||||
retcode, retmsg = pvc_ceph.ceph_osd_add(config, node, device, weight)
|
||||
retcode, retmsg = pvc_ceph.ceph_osd_add(config, node, device, weight, ext_db_flag, ext_db_ratio)
|
||||
cleanup(retcode, retmsg)
|
||||
|
||||
|
||||
@ -2635,7 +2683,7 @@ def ceph_osd_remove(osdid, confirm_flag):
|
||||
"""
|
||||
Remove a Ceph OSD with ID OSDID.
|
||||
|
||||
DANGER: This will completely remove the OSD from the cluster. OSDs will rebalance which may negatively affect performance or available space.
|
||||
DANGER: This will completely remove the OSD from the cluster. OSDs will rebalance which will negatively affect performance and available space. It is STRONGLY RECOMMENDED to set an OSD out (using 'pvc storage osd out') and allow the cluster to fully rebalance (verified with 'pvc storage status') before removing an OSD.
|
||||
"""
|
||||
if not confirm_flag and not config['unsafe']:
|
||||
try:
|
||||
@ -4856,6 +4904,7 @@ ceph_benchmark.add_command(ceph_benchmark_run)
|
||||
ceph_benchmark.add_command(ceph_benchmark_info)
|
||||
ceph_benchmark.add_command(ceph_benchmark_list)
|
||||
|
||||
ceph_osd.add_command(ceph_osd_create_db_vg)
|
||||
ceph_osd.add_command(ceph_osd_add)
|
||||
ceph_osd.add_command(ceph_osd_remove)
|
||||
ceph_osd.add_command(ceph_osd_in)
|
||||
|
@ -2,7 +2,7 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='pvc',
|
||||
version='0.9.35',
|
||||
version='0.9.37',
|
||||
packages=['pvc', 'pvc.cli_lib'],
|
||||
install_requires=[
|
||||
'Click',
|
||||
|
@ -180,20 +180,63 @@ def getClusterOSDList(zkhandler):
|
||||
|
||||
|
||||
def getOSDInformation(zkhandler, osd_id):
|
||||
# Get the devices
|
||||
osd_device = zkhandler.read(('osd.device', osd_id))
|
||||
osd_db_device = zkhandler.read(('osd.db_device', osd_id))
|
||||
# Parse the stats data
|
||||
osd_stats_raw = zkhandler.read(('osd.stats', osd_id))
|
||||
osd_stats = dict(json.loads(osd_stats_raw))
|
||||
|
||||
osd_information = {
|
||||
'id': osd_id,
|
||||
'stats': osd_stats
|
||||
'device': osd_device,
|
||||
'db_device': osd_db_device,
|
||||
'stats': osd_stats,
|
||||
}
|
||||
return osd_information
|
||||
|
||||
|
||||
# OSD DB VG actions use the /cmd/ceph pipe
|
||||
# These actions must occur on the specific node they reference
|
||||
def add_osd_db_vg(zkhandler, node, device):
|
||||
# Verify the target node exists
|
||||
if not common.verifyNode(zkhandler, node):
|
||||
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
|
||||
|
||||
# Tell the cluster to create a new OSD for the host
|
||||
add_osd_db_vg_string = 'db_vg_add {},{}'.format(node, device)
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', add_osd_db_vg_string)
|
||||
])
|
||||
# Wait 1/2 second for the cluster to get the message and start working
|
||||
time.sleep(0.5)
|
||||
# Acquire a read lock, so we get the return exclusively
|
||||
with zkhandler.readlock('base.cmd.ceph'):
|
||||
try:
|
||||
result = zkhandler.read('base.cmd.ceph').split()[0]
|
||||
if result == 'success-db_vg_add':
|
||||
message = 'Created new OSD database VG at "{}" on node "{}".'.format(device, node)
|
||||
success = True
|
||||
else:
|
||||
message = 'ERROR: Failed to create new OSD database VG; check node logs for details.'
|
||||
success = False
|
||||
except Exception:
|
||||
message = 'ERROR: Command ignored by node.'
|
||||
success = False
|
||||
|
||||
# Acquire a write lock to ensure things go smoothly
|
||||
with zkhandler.writelock('base.cmd.ceph'):
|
||||
time.sleep(0.5)
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', '')
|
||||
])
|
||||
|
||||
return success, message
|
||||
|
||||
|
||||
# OSD addition and removal uses the /cmd/ceph pipe
|
||||
# These actions must occur on the specific node they reference
|
||||
def add_osd(zkhandler, node, device, weight):
|
||||
def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
|
||||
# Verify the target node exists
|
||||
if not common.verifyNode(zkhandler, node):
|
||||
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
|
||||
@ -204,7 +247,7 @@ def add_osd(zkhandler, node, device, weight):
|
||||
return False, 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(device, node, block_osd)
|
||||
|
||||
# Tell the cluster to create a new OSD for the host
|
||||
add_osd_string = 'osd_add {},{},{}'.format(node, device, weight)
|
||||
add_osd_string = 'osd_add {},{},{},{},{}'.format(node, device, weight, ext_db_flag, ext_db_ratio)
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', add_osd_string)
|
||||
])
|
||||
|
1
daemon-common/migrations/versions/5.json
Normal file
1
daemon-common/migrations/versions/5.json
Normal file
@ -0,0 +1 @@
|
||||
{"version": "5", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "cmd": "/cmd", "cmd.node": "/cmd/nodes", "cmd.domain": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "logs": "/logs", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}
|
@ -466,7 +466,7 @@ class ZKHandler(object):
|
||||
#
|
||||
class ZKSchema(object):
|
||||
# Current version
|
||||
_version = 4
|
||||
_version = 5
|
||||
|
||||
# Root for doing nested keys
|
||||
_schema_root = ''
|
||||
@ -636,6 +636,7 @@ class ZKSchema(object):
|
||||
'id': '', # The root key
|
||||
'node': '/node',
|
||||
'device': '/device',
|
||||
'db_device': '/db_device',
|
||||
'stats': '/stats'
|
||||
},
|
||||
# The schema of an individual pool entry (/ceph/pools/{pool_name})
|
||||
|
21
debian/changelog
vendored
21
debian/changelog
vendored
@ -1,3 +1,24 @@
|
||||
pvc (0.9.37-0) unstable; urgency=high
|
||||
|
||||
* [All] Adds support for configurable OSD DB size ratios
|
||||
* [Node Daemon] Fixes bugs with OSD creation
|
||||
* [Node Daemon] Fixes exception bugs in CephInstance
|
||||
* [CLI Client] Adjusts descriptions around Ceph OSDs
|
||||
* [Node Daemon] Fixes ordering of pvc-flush unit
|
||||
* [Node Daemon] Fixes bugs in fence handling and libvirt keepalive
|
||||
* [Node Daemon] Simplifies locking for and speeds up VM migrations
|
||||
* [Node Daemon] Fixes bugs in queue get timeouts
|
||||
* [API Daemon] Adjusts benchmark test jobs configuration and naming
|
||||
|
||||
-- Joshua M. Boniface <joshua@boniface.me> Thu, 30 Sep 2021 02:02:53 -0400
|
||||
|
||||
pvc (0.9.36-0) unstable; urgency=high
|
||||
|
||||
* [Node Daemon] Fixes a bug during early cleanup
|
||||
* [All] Adds support for OSD database/WAL block devices to improve Ceph performance; NOTE: Applies only to new OSDs
|
||||
|
||||
-- Joshua M. Boniface <joshua@boniface.me> Thu, 23 Sep 2021 14:01:38 -0400
|
||||
|
||||
pvc (0.9.35-0) unstable; urgency=high
|
||||
|
||||
* [Node Daemon] Fixes several bugs and crashes in node daemon
|
||||
|
@ -55,6 +55,23 @@ While PVC's API and internals aren't very screenshot-worthy, here is some exampl
|
||||
|
||||
## Changelog
|
||||
|
||||
#### v0.9.37
|
||||
|
||||
* [All] Adds support for configurable OSD DB size ratios
|
||||
* [Node Daemon] Fixes bugs with OSD creation
|
||||
* [Node Daemon] Fixes exception bugs in CephInstance
|
||||
* [CLI Client] Adjusts descriptions around Ceph OSDs
|
||||
* [Node Daemon] Fixes ordering of pvc-flush unit
|
||||
* [Node Daemon] Fixes bugs in fence handling and libvirt keepalive
|
||||
* [Node Daemon] Simplifies locking for and speeds up VM migrations
|
||||
* [Node Daemon] Fixes bugs in queue get timeouts
|
||||
* [API Daemon] Adjusts benchmark test jobs configuration and naming
|
||||
|
||||
#### v0.9.36
|
||||
|
||||
* [Node Daemon] Fixes a bug during early cleanup
|
||||
* [All] Adds support for OSD database/WAL block devices to improve Ceph performance; NOTE: Applies only to new OSDs
|
||||
|
||||
#### v0.9.35
|
||||
|
||||
* [Node Daemon] Fixes several bugs and crashes in node daemon
|
||||
|
@ -520,6 +520,14 @@
|
||||
},
|
||||
"osd": {
|
||||
"properties": {
|
||||
"db_device": {
|
||||
"description": "The OSD database/WAL block device (logical volume); empty if not applicable",
|
||||
"type": "string"
|
||||
},
|
||||
"device": {
|
||||
"description": "The OSD data block device",
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"description": "The Ceph ID of the OSD",
|
||||
"type": "string (containing integer)"
|
||||
@ -5011,6 +5019,20 @@
|
||||
"name": "weight",
|
||||
"required": true,
|
||||
"type": "number"
|
||||
},
|
||||
{
|
||||
"description": "Whether to use an external OSD DB LV device",
|
||||
"in": "query",
|
||||
"name": "ext_db",
|
||||
"required": false,
|
||||
"type": "boolean"
|
||||
},
|
||||
{
|
||||
"description": "Decimal ratio of total OSD size for the external OSD DB LV device, default 0.05 (5%)",
|
||||
"in": "query",
|
||||
"name": "ext_db_ratio",
|
||||
"required": false,
|
||||
"type": "float"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
@ -5133,6 +5155,45 @@
|
||||
]
|
||||
}
|
||||
},
|
||||
"/api/v1/storage/ceph/osddb": {
|
||||
"post": {
|
||||
"description": "Note: This task may take up to 30s to complete and return",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "The PVC node to create the OSD DB volume group on",
|
||||
"in": "query",
|
||||
"name": "node",
|
||||
"required": true,
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) to create the OSD DB volume group on",
|
||||
"in": "query",
|
||||
"name": "device",
|
||||
"required": true,
|
||||
"type": "string"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Message"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Message"
|
||||
}
|
||||
}
|
||||
},
|
||||
"summary": "Add a Ceph OSD database volume group to the cluster",
|
||||
"tags": [
|
||||
"storage / ceph"
|
||||
]
|
||||
}
|
||||
},
|
||||
"/api/v1/storage/ceph/pool": {
|
||||
"get": {
|
||||
"description": "",
|
||||
@ -6248,6 +6309,68 @@
|
||||
]
|
||||
}
|
||||
},
|
||||
"/api/v1/vm/{vm}/device": {
|
||||
"delete": {
|
||||
"description": "",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "The raw Libvirt XML definition of the device to detach",
|
||||
"in": "query",
|
||||
"name": "xml",
|
||||
"required": true,
|
||||
"type": "string"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Message"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Message"
|
||||
}
|
||||
}
|
||||
},
|
||||
"summary": "Hot-detach device XML to {vm}",
|
||||
"tags": [
|
||||
"vm"
|
||||
]
|
||||
},
|
||||
"post": {
|
||||
"description": "",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "The raw Libvirt XML definition of the device to attach",
|
||||
"in": "query",
|
||||
"name": "xml",
|
||||
"required": true,
|
||||
"type": "string"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Message"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Message"
|
||||
}
|
||||
}
|
||||
},
|
||||
"summary": "Hot-attach device XML to {vm}",
|
||||
"tags": [
|
||||
"vm"
|
||||
]
|
||||
}
|
||||
},
|
||||
"/api/v1/vm/{vm}/locks": {
|
||||
"post": {
|
||||
"description": "",
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
[Unit]
|
||||
Description = Parallel Virtual Cluster autoflush daemon
|
||||
After = pvcnoded.service pvcapid.service zookeeper.service libvirtd.service ssh.service ceph.target
|
||||
After = pvcnoded.service pvcapid.service zookeeper.service libvirtd.service ssh.service ceph.target network-online.target
|
||||
Wants = pvcnoded.service
|
||||
PartOf = pvc.target
|
||||
|
||||
|
@ -48,7 +48,7 @@ import re
|
||||
import json
|
||||
|
||||
# Daemon version
|
||||
version = '0.9.35'
|
||||
version = '0.9.37'
|
||||
|
||||
|
||||
##########################################################
|
||||
@ -217,13 +217,13 @@ def entrypoint():
|
||||
|
||||
# Stop console logging on all VMs
|
||||
logger.out('Stopping domain console watchers', state='s')
|
||||
if d_domain is not None:
|
||||
for domain in d_domain:
|
||||
if d_domain[domain].getnode() == config['node_hostname']:
|
||||
try:
|
||||
try:
|
||||
if d_domain is not None:
|
||||
for domain in d_domain:
|
||||
if d_domain[domain].getnode() == config['node_hostname']:
|
||||
d_domain[domain].console_log_instance.stop()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Force into secondary coordinator state if needed
|
||||
try:
|
||||
@ -689,7 +689,7 @@ def entrypoint():
|
||||
|
||||
# Update the new list
|
||||
volume_list[pool] = new_volume_list
|
||||
logger.out(f'{logger.fmt_blue}Volume list [{pool}:{logger.fmt_end} {" ".join(volume_list[pool])}', state='i')
|
||||
logger.out(f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}', state='i')
|
||||
|
||||
# Start keepalived thread
|
||||
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(logger, config, zkhandler, this_node)
|
||||
|
@ -25,6 +25,9 @@ import psutil
|
||||
|
||||
import daemon_lib.common as common
|
||||
|
||||
from distutils.util import strtobool
|
||||
from re import search
|
||||
|
||||
|
||||
class CephOSDInstance(object):
|
||||
def __init__(self, zkhandler, this_node, osd_id):
|
||||
@ -66,7 +69,7 @@ class CephOSDInstance(object):
|
||||
self.stats = json.loads(data)
|
||||
|
||||
@staticmethod
|
||||
def add_osd(zkhandler, logger, node, device, weight):
|
||||
def add_osd(zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
|
||||
# We are ready to create a new OSD on this node
|
||||
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
|
||||
try:
|
||||
@ -76,7 +79,7 @@ class CephOSDInstance(object):
|
||||
print('ceph osd create')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
osd_id = stdout.rstrip()
|
||||
|
||||
# 2. Remove that newly-created OSD
|
||||
@ -85,7 +88,7 @@ class CephOSDInstance(object):
|
||||
print('ceph osd rm')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 3a. Zap the disk to ensure it is ready to go
|
||||
logger.out('Zapping disk {}'.format(device), state='i')
|
||||
@ -94,21 +97,35 @@ class CephOSDInstance(object):
|
||||
print('ceph-volume lvm zap')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 3b. Create the OSD for real
|
||||
dev_flags = "--data {}".format(device)
|
||||
|
||||
# 3b. Prepare the logical volume if ext_db_flag
|
||||
if ext_db_flag:
|
||||
_, osd_size_bytes, _ = common.run_os_command('blockdev --getsize64 {}'.format(device))
|
||||
osd_size_bytes = int(osd_size_bytes)
|
||||
result = CephOSDInstance.create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes)
|
||||
if not result:
|
||||
raise Exception
|
||||
db_device = "osd-db/osd-{}".format(osd_id)
|
||||
dev_flags += " --block.db {}".format(db_device)
|
||||
else:
|
||||
db_device = ""
|
||||
|
||||
# 3c. Create the OSD for real
|
||||
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'ceph-volume lvm prepare --bluestore --data {device}'.format(
|
||||
'ceph-volume lvm prepare --bluestore {devices}'.format(
|
||||
osdid=osd_id,
|
||||
device=device
|
||||
devices=dev_flags
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph-volume lvm prepare')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 4a. Get OSD FSID
|
||||
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
|
||||
@ -127,7 +144,7 @@ class CephOSDInstance(object):
|
||||
print('Could not find OSD fsid in data:')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 4b. Activate the OSD
|
||||
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
|
||||
@ -141,7 +158,7 @@ class CephOSDInstance(object):
|
||||
print('ceph-volume lvm activate')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 5. Add it to the crush map
|
||||
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
|
||||
@ -156,7 +173,7 @@ class CephOSDInstance(object):
|
||||
print('ceph osd crush add')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
time.sleep(0.5)
|
||||
|
||||
# 6. Verify it started
|
||||
@ -169,7 +186,7 @@ class CephOSDInstance(object):
|
||||
print('systemctl status')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 7. Add the new OSD to the list
|
||||
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
|
||||
@ -177,6 +194,7 @@ class CephOSDInstance(object):
|
||||
(('osd', osd_id), ''),
|
||||
(('osd.node', osd_id), node),
|
||||
(('osd.device', osd_id), device),
|
||||
(('osd.db_device', osd_id), db_device),
|
||||
(('osd.stats', osd_id), '{}'),
|
||||
])
|
||||
|
||||
@ -206,7 +224,7 @@ class CephOSDInstance(object):
|
||||
print('ceph osd out')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 2. Wait for the OSD to flush
|
||||
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
|
||||
@ -222,7 +240,7 @@ class CephOSDInstance(object):
|
||||
if num_pgs > 0:
|
||||
time.sleep(5)
|
||||
else:
|
||||
raise
|
||||
raise Exception
|
||||
except Exception:
|
||||
break
|
||||
|
||||
@ -233,7 +251,7 @@ class CephOSDInstance(object):
|
||||
print('systemctl stop')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# FIXME: There has to be a better way to do this /shrug
|
||||
while True:
|
||||
@ -259,7 +277,7 @@ class CephOSDInstance(object):
|
||||
print('ceph-volume lvm zap')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 6. Purge the OSD from Ceph
|
||||
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
|
||||
@ -268,9 +286,15 @@ class CephOSDInstance(object):
|
||||
print('ceph osd purge')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
raise Exception
|
||||
|
||||
# 7. Delete OSD from ZK
|
||||
# 7. Remove the DB device
|
||||
if zkhandler.exists(('osd.db_device', osd_id)):
|
||||
db_device = zkhandler.read(('osd.db_device', osd_id))
|
||||
logger.out('Removing OSD DB logical volume "{}"'.format(db_device), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('lvremove --yes --force {}'.format(db_device))
|
||||
|
||||
# 8. Delete OSD from ZK
|
||||
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
|
||||
zkhandler.delete(('osd', osd_id), recursive=True)
|
||||
|
||||
@ -282,6 +306,114 @@ class CephOSDInstance(object):
|
||||
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def add_db_vg(zkhandler, logger, device):
|
||||
logger.out('Creating new OSD database volume group on block device {}'.format(device), state='i')
|
||||
try:
|
||||
# 0. Check if an existsing volume group exists
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'vgdisplay osd-db'
|
||||
)
|
||||
if retcode != 5:
|
||||
logger.out('Ceph OSD database VG "osd-db" already exists', state='e')
|
||||
return False
|
||||
|
||||
# 1. Create an empty partition table
|
||||
logger.out('Creating partitions on block device {}'.format(device), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'sgdisk --clear {}'.format(device)
|
||||
)
|
||||
if retcode:
|
||||
print('sgdisk create partition table')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'sgdisk --new 1:: --typecode 1:8e00 {}'.format(device)
|
||||
)
|
||||
if retcode:
|
||||
print('sgdisk create pv partition')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Handle the partition ID portion
|
||||
if search(r'by-path', device) or search(r'by-id', device):
|
||||
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
|
||||
partition = '{}-part1'.format(device)
|
||||
elif search(r'nvme', device):
|
||||
# /dev/nvme0n1 -> nvme0n1p1
|
||||
partition = '{}p1'.format(device)
|
||||
else:
|
||||
# /dev/sda -> sda1
|
||||
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
|
||||
partition = '{}1'.format(device)
|
||||
|
||||
# 2. Create the PV
|
||||
logger.out('Creating PV on block device {}'.format(partition), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'pvcreate --force {}'.format(partition)
|
||||
)
|
||||
if retcode:
|
||||
print('pv creation')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 2. Create the VG (named 'osd-db')
|
||||
logger.out('Creating VG "osd-db" on block device {}'.format(partition), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'vgcreate --force osd-db {}'.format(partition)
|
||||
)
|
||||
if retcode:
|
||||
print('vg creation')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Log it
|
||||
logger.out('Created new OSD database volume group on block device {}'.format(device), state='o')
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create OSD database volume group: {}'.format(e), state='e')
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes):
|
||||
logger.out('Creating new OSD database logical volume for OSD ID {}'.format(osd_id), state='i')
|
||||
try:
|
||||
# 0. Check if an existsing logical volume exists
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'lvdisplay osd-db/osd{}'.format(osd_id)
|
||||
)
|
||||
if retcode != 5:
|
||||
logger.out('Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id), state='e')
|
||||
return False
|
||||
|
||||
# 1. Determine LV sizing
|
||||
osd_db_size = int(osd_size_bytes * ext_db_ratio / 1024 / 1024)
|
||||
|
||||
# 2. Create the LV
|
||||
logger.out('Creating DB LV "osd-db/osd-{}" of {}M ({} * {})'.format(osd_id, osd_db_size, osd_size_bytes, ext_db_ratio), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'lvcreate --yes --name osd-{} --size {} osd-db'.format(osd_id, osd_db_size)
|
||||
)
|
||||
if retcode:
|
||||
print('db lv creation')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Log it
|
||||
logger.out('Created new OSD database logical volume "osd-db/osd-{}"'.format(osd_id), state='o')
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create OSD database logical volume: {}'.format(e), state='e')
|
||||
return False
|
||||
|
||||
|
||||
class CephPoolInstance(object):
|
||||
def __init__(self, zkhandler, this_node, name):
|
||||
@ -379,13 +511,15 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
|
||||
|
||||
# Adding a new OSD
|
||||
if command == 'osd_add':
|
||||
node, device, weight = args.split(',')
|
||||
node, device, weight, ext_db_flag, ext_db_ratio = args.split(',')
|
||||
ext_db_flag = bool(strtobool(ext_db_flag))
|
||||
ext_db_ratio = float(ext_db_ratio)
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
with zk_lock:
|
||||
# Add the OSD
|
||||
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight)
|
||||
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight, ext_db_flag, ext_db_ratio)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
@ -426,3 +560,27 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
|
||||
])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
||||
# Adding a new DB VG
|
||||
elif command == 'db_vg_add':
|
||||
node, device = args.split(',')
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
with zk_lock:
|
||||
# Add the VG
|
||||
result = CephOSDInstance.add_db_vg(zkhandler, logger, device)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'success-{}'.format(data))
|
||||
])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'failure={}'.format(data))
|
||||
])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
@ -21,7 +21,7 @@
|
||||
|
||||
import time
|
||||
|
||||
from threading import Thread
|
||||
from threading import Thread, Event
|
||||
|
||||
import daemon_lib.common as common
|
||||
|
||||
@ -86,6 +86,7 @@ class NodeInstance(object):
|
||||
self.storage_cidrnetmask = None
|
||||
# Threads
|
||||
self.flush_thread = None
|
||||
self.flush_event = Event()
|
||||
# Flags
|
||||
self.flush_stopper = False
|
||||
|
||||
@ -159,8 +160,8 @@ class NodeInstance(object):
|
||||
if self.flush_thread is not None:
|
||||
self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i')
|
||||
self.flush_stopper = True
|
||||
while self.flush_stopper:
|
||||
time.sleep(0.1)
|
||||
self.flush_event.wait()
|
||||
self.flush_event.clear()
|
||||
|
||||
# Do flushing in a thread so it doesn't block the migrates out
|
||||
if self.domain_state == 'flush':
|
||||
@ -679,6 +680,7 @@ class NodeInstance(object):
|
||||
# Allow us to cancel the operation
|
||||
if self.flush_stopper:
|
||||
self.logger.out('Aborting node flush'.format(self.name), state='i')
|
||||
self.flush_event.set()
|
||||
self.flush_thread = None
|
||||
self.flush_stopper = False
|
||||
return
|
||||
@ -711,6 +713,7 @@ class NodeInstance(object):
|
||||
|
||||
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
|
||||
ticks = 0
|
||||
self.logger.out('Waiting for migration of VM "{}"'.format(dom_uuid), state='i')
|
||||
while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
|
||||
ticks += 1
|
||||
if ticks > 600:
|
||||
@ -733,6 +736,7 @@ class NodeInstance(object):
|
||||
# Allow us to cancel the operation
|
||||
if self.flush_stopper:
|
||||
self.logger.out('Aborting node unflush'.format(self.name), state='i')
|
||||
self.flush_event.set()
|
||||
self.flush_thread = None
|
||||
self.flush_stopper = False
|
||||
return
|
||||
@ -766,8 +770,14 @@ class NodeInstance(object):
|
||||
])
|
||||
|
||||
# Wait for the VM to migrate back
|
||||
ticks = 0
|
||||
self.logger.out('Waiting for migration of VM "{}"'.format(dom_uuid), state='i')
|
||||
while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
|
||||
time.sleep(0.1)
|
||||
ticks += 1
|
||||
if ticks > 600:
|
||||
# Abort if we've waited for 120 seconds, the VM is messed and just continue
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
self.zkhandler.write([
|
||||
(('node.state.domain', self.name), 'ready')
|
||||
|
@ -67,6 +67,9 @@ class VMInstance(object):
|
||||
self.inshutdown = False
|
||||
self.instop = False
|
||||
|
||||
# State thread
|
||||
self.state_thread = None
|
||||
|
||||
# Libvirt domuuid
|
||||
self.dom = self.lookupByUUID(self.domuuid)
|
||||
|
||||
@ -83,8 +86,8 @@ class VMInstance(object):
|
||||
|
||||
# Perform a management command
|
||||
self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i')
|
||||
state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
|
||||
state_thread.start()
|
||||
self.state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
|
||||
self.state_thread.start()
|
||||
|
||||
# Get data functions
|
||||
def getstate(self):
|
||||
@ -425,42 +428,12 @@ class VMInstance(object):
|
||||
migrate_lock_node.acquire()
|
||||
migrate_lock_state.acquire()
|
||||
|
||||
time.sleep(0.2) # Initial delay for the first writer to grab the lock
|
||||
|
||||
# Don't try to migrate a node to itself, set back to start
|
||||
if self.node == self.lastnode or self.node == self.this_node.name:
|
||||
abort_migrate('Target node matches the current active node during initial check')
|
||||
return
|
||||
|
||||
# Synchronize nodes A (I am reader)
|
||||
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
if self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
|
||||
self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
ticks = 0
|
||||
while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
|
||||
time.sleep(0.1)
|
||||
ticks += 1
|
||||
if ticks > 300:
|
||||
self.logger.out('Timed out waiting 30s for peer', state='e', prefix='Domain {}'.format(self.domuuid))
|
||||
aborted = True
|
||||
break
|
||||
self.logger.out('Releasing read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
if aborted:
|
||||
abort_migrate('Timed out waiting for peer')
|
||||
return
|
||||
|
||||
# Synchronize nodes B (I am writer)
|
||||
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
time.sleep(0.5) # Time for reader to acquire the lock
|
||||
time.sleep(0.5) # Initial delay for the first writer to grab the lock
|
||||
|
||||
def migrate_live():
|
||||
self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
@ -492,7 +465,6 @@ class VMInstance(object):
|
||||
dest_lv_conn.close()
|
||||
self.console_log_instance.stop()
|
||||
self.removeDomainFromList()
|
||||
|
||||
return True
|
||||
|
||||
def migrate_shutdown():
|
||||
@ -500,9 +472,15 @@ class VMInstance(object):
|
||||
self.shutdown_vm()
|
||||
return True
|
||||
|
||||
do_migrate_shutdown = False
|
||||
self.logger.out('Acquiring lock for migration phase B', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
|
||||
try:
|
||||
lock.acquire(timeout=30.0)
|
||||
except Exception:
|
||||
abort_migrate('Timed out waiting for peer')
|
||||
return
|
||||
self.logger.out('Acquired lock for migration phase B', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
migrate_live_result = False
|
||||
|
||||
# Do a final verification
|
||||
if self.node == self.lastnode or self.node == self.this_node.name:
|
||||
abort_migrate('Target node matches the current active node during final check')
|
||||
@ -510,7 +488,6 @@ class VMInstance(object):
|
||||
if self.node != target_node:
|
||||
abort_migrate('Target node changed during preparation')
|
||||
return
|
||||
|
||||
if not force_shutdown:
|
||||
# A live migrate is attemped 3 times in succession
|
||||
ticks = 0
|
||||
@ -525,59 +502,20 @@ class VMInstance(object):
|
||||
break
|
||||
else:
|
||||
migrate_live_result = False
|
||||
|
||||
if not migrate_live_result:
|
||||
if force_live:
|
||||
self.logger.out('Could not live migrate VM while live migration enforced', state='e', prefix='Domain {}'.format(self.domuuid))
|
||||
aborted = True
|
||||
else:
|
||||
do_migrate_shutdown = True
|
||||
|
||||
self.logger.out('Releasing write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
migrate_shutdown()
|
||||
if aborted:
|
||||
abort_migrate('Live migration failed and is required')
|
||||
return
|
||||
|
||||
# Synchronize nodes C (I am writer)
|
||||
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
time.sleep(0.5) # Time for reader to acquire the lock
|
||||
|
||||
if do_migrate_shutdown:
|
||||
migrate_shutdown()
|
||||
|
||||
self.logger.out('Releasing write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
# Synchronize nodes D (I am reader)
|
||||
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
|
||||
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
|
||||
|
||||
self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
# Wait for the receive side to complete before we declare all-done and release locks
|
||||
ticks = 0
|
||||
while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) != '':
|
||||
time.sleep(0.1)
|
||||
ticks += 1
|
||||
if ticks > 100:
|
||||
self.logger.out('Sync lock clear exceeded 10s timeout, continuing', state='w', prefix='Domain {}'.format(self.domuuid))
|
||||
break
|
||||
migrate_lock_node.release()
|
||||
migrate_lock_state.release()
|
||||
lock.release()
|
||||
|
||||
self.inmigrate = False
|
||||
return
|
||||
@ -592,55 +530,29 @@ class VMInstance(object):
|
||||
|
||||
self.logger.out('Receiving VM migration from node "{}"'.format(self.last_currentnode), state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
# Short delay to ensure sender is in sync
|
||||
time.sleep(0.5)
|
||||
|
||||
# Ensure our lock key is populated
|
||||
self.zkhandler.write([
|
||||
(('domain.migrate.sync_lock', self.domuuid), self.domuuid)
|
||||
])
|
||||
|
||||
# Synchronize nodes A (I am writer)
|
||||
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
time.sleep(1) # Time for reader to acquire the lock
|
||||
self.logger.out('Releasing write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
self.logger.out('Acquiring lock for migration phase A', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
|
||||
try:
|
||||
lock.acquire(timeout=30.0)
|
||||
except Exception:
|
||||
self.logger.out('Failed to acquire exclusive lock for VM', state='w')
|
||||
return
|
||||
self.logger.out('Acquired lock for migration phase A', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
# Exactly twice the amount of time that the other side is waiting
|
||||
time.sleep(1)
|
||||
lock.release()
|
||||
self.logger.out('Released write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
time.sleep(0.1) # Time for new writer to acquire the lock
|
||||
|
||||
# Synchronize nodes B (I am reader)
|
||||
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
self.logger.out('Acquiring lock for phase C', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
self.logger.out('Releasing read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
# Synchronize nodes C (I am reader)
|
||||
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
self.logger.out('Acquired lock for migration phase C', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
# Set the updated data
|
||||
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
|
||||
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
|
||||
|
||||
self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
# Synchronize nodes D (I am writer)
|
||||
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
|
||||
self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.acquire()
|
||||
self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
time.sleep(0.5) # Time for reader to acquire the lock
|
||||
|
||||
self.state = self.zkhandler.read(('domain.state', self.domuuid))
|
||||
self.dom = self.lookupByUUID(self.domuuid)
|
||||
if self.dom:
|
||||
@ -672,10 +584,7 @@ class VMInstance(object):
|
||||
else:
|
||||
# The send failed or was aborted
|
||||
self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
lock.release()
|
||||
self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
|
||||
|
||||
self.zkhandler.write([
|
||||
(('domain.migrate.sync_lock', self.domuuid), '')
|
||||
@ -795,6 +704,9 @@ class VMInstance(object):
|
||||
else:
|
||||
self.terminate_vm()
|
||||
|
||||
self.state_thread = None
|
||||
return
|
||||
|
||||
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
|
||||
# 1. Takes a text UUID and handles converting it to bytes
|
||||
# 2. Try's it and returns a sensible value if not
|
||||
|
@ -56,8 +56,15 @@ def fence_node(node_name, zkhandler, config, logger):
|
||||
|
||||
# Shoot it in the head
|
||||
fence_status = reboot_via_ipmi(ipmi_hostname, ipmi_username, ipmi_password, logger)
|
||||
|
||||
# Hold to ensure the fence takes effect and system stabilizes
|
||||
time.sleep(config['keepalive_interval'] * 2)
|
||||
logger.out('Waiting {}s for fence of node "{}" to take effect'.format(config['keepalive_interval'], node_name), state='i')
|
||||
time.sleep(config['keepalive_interval'])
|
||||
if fence_status:
|
||||
logger.out('Marking node "{}" as fenced'.format(node_name), state='i')
|
||||
zkhandler.write([
|
||||
(('node.state.daemon', node_name), 'fenced')
|
||||
])
|
||||
|
||||
# Force into secondary network state if needed
|
||||
if node_name in config['coordinators']:
|
||||
@ -119,6 +126,7 @@ def migrateFromFencedNode(zkhandler, node_name, config, logger):
|
||||
zkhandler.write([
|
||||
(('node.state.domain', node_name), 'flushed')
|
||||
])
|
||||
logger.out('All VMs flushed from dead node "{}" to new hosts'.format(node_name), state='i')
|
||||
|
||||
|
||||
#
|
||||
@ -152,20 +160,20 @@ def reboot_via_ipmi(ipmi_hostname, ipmi_user, ipmi_password, logger):
|
||||
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(ipmi_command_status)
|
||||
|
||||
if ipmi_reset_retcode == 0:
|
||||
if ipmi_status_stdout == "Chassis Power is on":
|
||||
if ipmi_status_stdout.strip() == "Chassis Power is on":
|
||||
# We successfully rebooted the node and it is powered on; this is a succeessful fence
|
||||
logger.out('Successfully rebooted dead node', state='o')
|
||||
return True
|
||||
elif ipmi_status_stdout == "Chassis Power is off":
|
||||
elif ipmi_status_stdout.strip() == "Chassis Power is off":
|
||||
# We successfully rebooted the node but it is powered off; this might be expected or not, but the node is confirmed off so we can call it a successful fence
|
||||
logger.out('Chassis power is in confirmed off state after successfuly IPMI reboot; proceeding with fence-flush', state='o')
|
||||
return True
|
||||
else:
|
||||
# We successfully rebooted the node but it is in some unknown power state; since this might indicate a silent failure, we must call it a failed fence
|
||||
logger.out('Chassis power is in an unknown state after successful IPMI reboot; not performing fence-flush', state='e')
|
||||
logger.out('Chassis power is in an unknown state ({}) after successful IPMI reboot; not performing fence-flush'.format(ipmi_status_stdout.strip()), state='e')
|
||||
return False
|
||||
else:
|
||||
if ipmi_status_stdout == "Chassis Power is off":
|
||||
if ipmi_status_stdout.strip() == "Chassis Power is off":
|
||||
# We failed to reboot the node but it is powered off; it has probably suffered a serious hardware failure, but the node is confirmed off so we can call it a successful fence
|
||||
logger.out('Chassis power is in confirmed off state after failed IPMI reboot; proceeding with fence-flush', state='o')
|
||||
return True
|
||||
@ -181,7 +189,7 @@ def reboot_via_ipmi(ipmi_hostname, ipmi_user, ipmi_password, logger):
|
||||
def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password):
|
||||
ipmi_command = f'/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status'
|
||||
retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2)
|
||||
if retcode == 0 and stdout != "Chassis Power is on":
|
||||
if retcode == 0 and stdout.strip() != "Chassis Power is on":
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -361,9 +361,13 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
libvirt_name = "qemu:///system"
|
||||
if debug:
|
||||
logger.out("Connecting to libvirt", state='d', prefix='vm-thread')
|
||||
lv_conn = libvirt.open(libvirt_name)
|
||||
if lv_conn is None:
|
||||
try:
|
||||
lv_conn = libvirt.open(libvirt_name)
|
||||
if lv_conn is None:
|
||||
raise Exception
|
||||
except Exception:
|
||||
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
|
||||
return
|
||||
|
||||
memalloc = 0
|
||||
memprov = 0
|
||||
@ -588,23 +592,23 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
|
||||
# Join against running threads
|
||||
if config['enable_hypervisor']:
|
||||
vm_stats_thread.join(timeout=4.0)
|
||||
vm_stats_thread.join(timeout=config['keepalive_interval'])
|
||||
if vm_stats_thread.is_alive():
|
||||
logger.out('VM stats gathering exceeded 4s timeout, continuing', state='w')
|
||||
logger.out('VM stats gathering exceeded timeout, continuing', state='w')
|
||||
if config['enable_storage']:
|
||||
ceph_stats_thread.join(timeout=4.0)
|
||||
ceph_stats_thread.join(timeout=config['keepalive_interval'])
|
||||
if ceph_stats_thread.is_alive():
|
||||
logger.out('Ceph stats gathering exceeded 4s timeout, continuing', state='w')
|
||||
logger.out('Ceph stats gathering exceeded timeout, continuing', state='w')
|
||||
|
||||
# Get information from thread queues
|
||||
if config['enable_hypervisor']:
|
||||
try:
|
||||
this_node.domains_count = vm_thread_queue.get()
|
||||
this_node.memalloc = vm_thread_queue.get()
|
||||
this_node.memprov = vm_thread_queue.get()
|
||||
this_node.vcpualloc = vm_thread_queue.get()
|
||||
this_node.domains_count = vm_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
this_node.memalloc = vm_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
this_node.memprov = vm_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
this_node.vcpualloc = vm_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
except Exception:
|
||||
pass
|
||||
logger.out('VM stats queue get exceeded timeout, continuing', state='w')
|
||||
else:
|
||||
this_node.domains_count = 0
|
||||
this_node.memalloc = 0
|
||||
@ -613,10 +617,11 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
|
||||
if config['enable_storage']:
|
||||
try:
|
||||
ceph_health_colour = ceph_thread_queue.get()
|
||||
ceph_health = ceph_thread_queue.get()
|
||||
osds_this_node = ceph_thread_queue.get()
|
||||
ceph_health_colour = ceph_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
ceph_health = ceph_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
osds_this_node = ceph_thread_queue.get(timeout=config['keepalive_interval'])
|
||||
except Exception:
|
||||
logger.out('Ceph stats queue get exceeded timeout, continuing', state='w')
|
||||
ceph_health_colour = logger.fmt_cyan
|
||||
ceph_health = 'UNKNOWN'
|
||||
osds_this_node = '?'
|
||||
|
Reference in New Issue
Block a user