Files
pvc/node-daemon/pvcnoded/objects/CephInstance.py
Joshua M. Boniface adc8a5a3bc Add separate OSD DB device support
Adds in three parts:

1. Create an API endpoint to create OSD DB volume groups on a device.
Passed through to the node via the same command pipeline as
creating/removing OSDs, and creates a volume group with a fixed name
(osd-db).

2. Adds API support for specifying whether or not to use this DB volume
group when creating a new OSD via the "ext_db" flag. Naming and sizing
is fixed for simplicity and based on Ceph recommendations (5% of OSD
size). The Zookeeper schema tracks the block device to use during
removal.

3. Adds CLI support for the new and modified API endpoints, as well as
displaying the block device and DB block device in the OSD list.

While I debated supporting adding a DB device to an existing OSD, in
practice this ended up being a very complex operation involving stopping
the OSD and setting some options, so this is not supported; this can be
specified during OSD creation only.

Closes #142
2021-09-23 13:59:49 -04:00

564 lines
22 KiB
Python

#!/usr/bin/env python3
# CephInstance.py - Class implementing a PVC node Ceph instance
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import json
import psutil
import daemon_lib.common as common
from distutils.util import strtobool
class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id):
self.zkhandler = zkhandler
self.this_node = this_node
self.osd_id = osd_id
self.node = None
self.size = None
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id))
def watch_osd_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.node:
self.node = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id))
def watch_osd_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
@staticmethod
def add_osd(zkhandler, logger, node, device, weight, ext_db_flag=False):
# We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
try:
# 1. Create an OSD; we do this so we know what ID will be gen'd
retcode, stdout, stderr = common.run_os_command('ceph osd create')
if retcode:
print('ceph osd create')
print(stdout)
print(stderr)
raise
osd_id = stdout.rstrip()
# 2. Remove that newly-created OSD
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
if retcode:
print('ceph osd rm')
print(stdout)
print(stderr)
raise
# 3a. Zap the disk to ensure it is ready to go
logger.out('Zapping disk {}'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(device))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
dev_flags = "--data {}".format(device)
# 3b. Prepare the logical volume if ext_db_flag
if ext_db_flag:
_, osd_size_bytes, _ = common.run_os_command('blockdev --getsize64 {}'.format(device))
osd_size_bytes = int(osd_size_bytes)
result = CephOSDInstance.create_osd_db_lv(zkhandler, logger, osd_id, osd_size_bytes)
if not result:
raise
db_device = "osd-db/osd-{}".format(osd_id)
dev_flags += " --block.db {}".format(db_device)
else:
db_device = ""
# 3c. Create the OSD for real
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm prepare --bluestore {devices}'.format(
osdid=osd_id,
devices=dev_flags
)
)
if retcode:
print('ceph-volume lvm prepare')
print(stdout)
print(stderr)
raise
# 4a. Get OSD FSID
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm list {device}'.format(
osdid=osd_id,
device=device
)
)
for line in stdout.split('\n'):
if 'osd fsid' in line:
osd_fsid = line.split()[-1]
if not osd_fsid:
print('ceph-volume lvm list')
print('Could not find OSD fsid in data:')
print(stdout)
print(stderr)
raise
# 4b. Activate the OSD
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm activate --bluestore {osdid} {osdfsid}'.format(
osdid=osd_id,
osdfsid=osd_fsid
)
)
if retcode:
print('ceph-volume lvm activate')
print(stdout)
print(stderr)
raise
# 5. Add it to the crush map
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph osd crush add osd.{osdid} {weight} root=default host={node}'.format(
osdid=osd_id,
weight=weight,
node=node
)
)
if retcode:
print('ceph osd crush add')
print(stdout)
print(stderr)
raise
time.sleep(0.5)
# 6. Verify it started
retcode, stdout, stderr = common.run_os_command(
'systemctl status ceph-osd@{osdid}'.format(
osdid=osd_id
)
)
if retcode:
print('systemctl status')
print(stdout)
print(stderr)
raise
# 7. Add the new OSD to the list
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
zkhandler.write([
(('osd', osd_id), ''),
(('osd.node', osd_id), node),
(('osd.device', osd_id), device),
(('osd.db_device', osd_id), db_device),
(('osd.stats', osd_id), '{}'),
])
# Log it
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
return False
@staticmethod
def remove_osd(zkhandler, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try:
# 1. Verify the OSD is present
retcode, stdout, stderr = common.run_os_command('ceph osd ls')
osd_list = stdout.split('\n')
if osd_id not in osd_list:
logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e')
return True
# 1. Set the OSD out so it will flush
logger.out('Setting out OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
if retcode:
print('ceph osd out')
print(stdout)
print(stderr)
raise
# 2. Wait for the OSD to flush
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
osd_string = str()
while True:
try:
retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json')
dump_string = json.loads(stdout)
for osd in dump_string:
if str(osd['osd']) == osd_id:
osd_string = osd
num_pgs = osd_string['num_pgs']
if num_pgs > 0:
time.sleep(5)
else:
raise
except Exception:
break
# 3. Stop the OSD process and wait for it to be terminated
logger.out('Stopping OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id))
if retcode:
print('systemctl stop')
print(stdout)
print(stderr)
raise
# FIXME: There has to be a better way to do this /shrug
while True:
is_osd_up = False
# Find if there is a process named ceph-osd with arg '--id {id}'
for p in psutil.process_iter(attrs=['name', 'cmdline']):
if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']):
is_osd_up = True
# If there isn't, continue
if not is_osd_up:
break
# 4. Determine the block devices
retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id))
vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name))
pv_block = stdout.strip()
# 5. Zap the volumes
logger.out('Zapping OSD disk with ID {} on {}'.format(osd_id, pv_block), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 6. Purge the OSD from Ceph
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id))
if retcode:
print('ceph osd purge')
print(stdout)
print(stderr)
raise
# 7. Remove the DB device
if zkhandler.exists(('osd.db_device', osd_id)):
db_device = zkhandler.read(('osd.db_device', osd_id))
logger.out('Removing OSD DB logical volume "{}"'.format(db_device), state='i')
retcode, stdout, stderr = common.run_os_command('lvremove --yes --force {}'.format(db_device))
# 8. Delete OSD from ZK
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
zkhandler.delete(('osd', osd_id), recursive=True)
# Log it
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
@staticmethod
def add_db_vg(zkhandler, logger, device):
logger.out('Creating new OSD database volume group on block device {}'.format(device), state='i')
try:
# 0. Check if an existsing volume group exists
retcode, stdout, stderr = common.run_os_command(
'vgdisplay osd-db'
)
if retcode != 5:
logger.out('Ceph OSD database VG "osd-db" already exists', state='e')
return False
# 1. Create an empty partition table
logger.out('Creating empty partiton table on block device {}'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command(
'echo -e "o\ny\nn\n\n\n\n8e00\nw\ny\n" | sudo gdisk {}'.format(device)
)
if retcode:
print('gdisk partitioning')
print(stdout)
print(stderr)
raise
# 2. Create the PV
logger.out('Creating PV on block device {}1'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command(
'pvcreate --force {}1'.format(device)
)
if retcode:
print('pv creation')
print(stdout)
print(stderr)
raise
# 2. Create the VG (named 'osd-db')
logger.out('Creating VG "osd-db" on block device {}1'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command(
'vgcreate --force osd-db {}1'.format(device)
)
if retcode:
print('vg creation')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Created new OSD database volume group on block device {}'.format(device), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create OSD database volume group: {}'.format(e), state='e')
return False
@staticmethod
def create_osd_db_lv(zkhandler, logger, osd_id, osd_size_bytes):
logger.out('Creating new OSD database logical volume for OSD ID {}'.format(osd_id), state='i')
try:
# 0. Check if an existsing logical volume exists
retcode, stdout, stderr = common.run_os_command(
'lvdisplay osd-db/osd{}'.format(osd_id)
)
if retcode != 5:
logger.out('Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id), state='e')
return False
# 1. Determine LV sizing (5% of OSD size, in MB)
osd_db_size = int(osd_size_bytes * 0.05 / 1024 / 1024)
# 2. Create the LV
logger.out('Creating LV "osd-db/osd-{}"'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command(
'lvcreate --yes --name osd-{} --size {} osd-db'.format(osd_id, osd_db_size)
)
if retcode:
print('db lv creation')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Created new OSD database logical volume "osd-db/osd-{}"'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create OSD database logical volume: {}'.format(e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.name = name
self.pgs = ''
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name))
def watch_pool_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.pgs:
self.pgs = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name))
def watch_pool_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephVolumeInstance(object):
def __init__(self, zkhandler, this_node, pool, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}'))
def watch_volume_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephSnapshotInstance(object):
def __init__(self, zkhandler, this_node, pool, volume, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.volume = volume
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapshot.stats', f'{self.pool}/{self.volume}/{self.name}'))
def watch_snapshot_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
# Primary command function
# This command pipe is only used for OSD adds and removes
def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'osd_add':
node, device, weight, ext_db_flag = args.split(',')
ext_db_flag = bool(strtobool(ext_db_flag))
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the OSD
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight, ext_db_flag)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing an OSD
elif command == 'osd_remove':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Remove the OSD
result = CephOSDInstance.remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new DB VG
elif command == 'db_vg_add':
node, device = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the VG
result = CephOSDInstance.add_db_vg(zkhandler, logger, device)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure={}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)