Update OSD replacement functionality

1. Simplify this by leveraging the existing remove_osd/add_osd
functions, since its task was functionally identical to those two in
sequential order.
2. Add support for split OSDs within the command (replacing all OSDs on
the block device(s) as required).
3. Add additional configurability and flexibility around the old device,
weight, and external DB LVs.
This commit is contained in:
2023-11-03 01:45:49 -04:00
parent 3cb8a70f04
commit 64e37ae963
6 changed files with 279 additions and 279 deletions

View File

@ -23,7 +23,7 @@ import time
import json
import daemon_lib.common as common
from daemon_lib.ceph import format_bytes_fromhuman
from daemon_lib.ceph import format_bytes_fromhuman, get_list_osd
from distutils.util import strtobool
from re import search, match, sub
@ -393,7 +393,7 @@ class CephOSDInstance(object):
raise Exception
# 4d. Get the list of created OSDs on the device (final pass)
logger.out(f"(Requerying OSD(s) on disk {device}", state="i")
logger.out(f"Requerying OSD(s) on disk {device}", state="i")
retcode, stdout, stderr = common.run_os_command(
f"ceph-volume lvm list --format json {device}"
)
@ -493,10 +493,11 @@ class CephOSDInstance(object):
logger,
node,
osd_id,
old_device,
new_device,
weight,
ext_db_flag=False,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
# Handle a detect device if that is passed
if match(r"detect:", new_device):
@ -514,223 +515,105 @@ class CephOSDInstance(object):
)
new_device = ddevice
# We are ready to create a new OSD on this node
logger.out(
"Replacing OSD {} disk with block device {}".format(osd_id, new_device),
state="i",
)
try:
# Verify the OSD is present
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
osd_list = stdout.split("\n")
if osd_id not in osd_list:
logger.out(
"Could not find OSD {} in the cluster".format(osd_id), state="e"
)
return True
# 1. Set the OSD down and out so it will flush
logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i")
# Phase 1: Try to determine what we can about the old device
def find_osds_from_block(device):
# Try to query the passed block device directly
logger.out(f"Querying for OSD(s) on disk {device}", state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph osd down {}".format(osd_id)
f"ceph-volume lvm list --format json {device}"
)
if retcode:
print("ceph osd down")
print(stdout)
print(stderr)
raise Exception
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph osd out {}".format(osd_id)
)
if retcode:
print("ceph osd out")
print(stdout)
print(stderr)
raise Exception
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
logger.out(f"Waiting for OSD {osd_id} to be safe to remove", state="i")
while True:
retcode, stdout, stderr = common.run_os_command(
f"ceph osd safe-to-destroy osd.{osd_id}"
)
if retcode in [0, 11]:
# Code 0 = success
# Code 11 = "Error EAGAIN: OSD(s) 5 have no reported stats, and not all PGs are active+clean; we cannot draw any conclusions." which means all PGs have been remappped but backfill is still occurring
break
else:
time.sleep(5)
# 3. Stop the OSD process
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"systemctl stop ceph-osd@{}".format(osd_id)
)
if retcode:
print("systemctl stop")
print(stdout)
print(stderr)
raise Exception
time.sleep(2)
# 4. Destroy the OSD
logger.out("Destroying OSD with ID {osd_id}", state="i")
retcode, stdout, stderr = common.run_os_command(
f"ceph osd destroy {osd_id} --yes-i-really-mean-it"
)
if retcode:
print("ceph osd destroy")
print(stdout)
print(stderr)
raise Exception
# 5. Adjust the weight
logger.out(
"Adjusting weight of OSD disk with ID {} in CRUSH map".format(osd_id),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph osd crush reweight osd.{osdid} {weight}".format(
osdid=osd_id, weight=weight
)
)
if retcode:
print("ceph osd crush reweight")
print(stdout)
print(stderr)
raise Exception
# 6a. Zap the new disk to ensure it is ready to go
logger.out("Zapping disk {}".format(new_device), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm zap --destroy {}".format(new_device)
)
if retcode:
print("ceph-volume lvm zap")
print(stdout)
print(stderr)
raise Exception
dev_flags = "--data {}".format(new_device)
# 6b. Prepare the logical volume if ext_db_flag
if ext_db_flag:
db_device = "osd-db/osd-{}".format(osd_id)
dev_flags += " --block.db {}".format(db_device)
found_osds = []
else:
db_device = ""
found_osds = jloads(stdout)
# 6c. Replace the OSD
logger.out(
"Preparing LVM for replaced OSD {} disk on {}".format(
osd_id, new_device
),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm prepare --osd-id {osdid} --bluestore {devices}".format(
osdid=osd_id, devices=dev_flags
return found_osds
real_old_device = None
osd_block = zkhandler.read(("osd.device", osd_id))
# Determine information from a passed old_device
if old_device is not None:
found_osds = find_osds_from_block(old_device)
if found_osds and osd_id in found_osds.keys():
real_old_device = old_device
else:
logger.out(
f"No OSD(s) found on disk {old_device}; falling back to PVC detection",
state="w",
)
)
if retcode:
print("ceph-volume lvm prepare")
print(stdout)
print(stderr)
raise Exception
# 7a. Get OSD information
# Try to get an old_device from our PVC information
if real_old_device is None:
found_osds = find_osds_from_block(osd_block)
if osd_id in found_osds.keys():
real_old_device = osd_block
if real_old_device is None:
skip_zap = True
logger.out(
"Getting OSD information for ID {} on {}".format(osd_id, new_device),
state="i",
"No valid old block device found for OSD; skipping zap", state="w"
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm list {device}".format(device=new_device)
)
for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line:
osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_fsid:
print("ceph-volume lvm list")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Reset whatever we were given to Ceph's /dev/xdX naming
if new_device != osd_device:
new_device = osd_device
# 7b. Activate the OSD
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
osdid=osd_id, osdfsid=osd_fsid
)
)
if retcode:
print("ceph-volume lvm activate")
print(stdout)
print(stderr)
raise Exception
time.sleep(0.5)
# 8. Verify it started
retcode, stdout, stderr = common.run_os_command(
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
)
if retcode:
print("systemctl status")
print(stdout)
print(stderr)
raise Exception
# 9. Update Zookeeper information
else:
skip_zap = False
logger.out(
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
)
zkhandler.write(
[
(("osd", osd_id), ""),
(("osd.node", osd_id), node),
(("osd.device", osd_id), new_device),
(("osd.db_device", osd_id), db_device),
(("osd.fsid", osd_id), ""),
(("osd.ofsid", osd_id), osd_fsid),
(("osd.cfsid", osd_id), osd_clusterfsid),
(("osd.lvm", osd_id), ""),
(("osd.vg", osd_id), osd_vg),
(("osd.lv", osd_id), osd_lv),
(
("osd.stats", osd_id),
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
),
]
f"Found source OSD(s) on block device {real_old_device}", state="i"
)
# Log it
logger.out(
"Replaced OSD {} disk with device {}".format(osd_id, new_device),
state="o",
# Try to determine if any other OSDs shared a block device with this OSD
all_osds_on_block = [
o
for o in get_list_osd(zkhandler, None)
if o["node"] == node and o["device"] == osd_block
]
# Remove each OSD on the block device
for osd in all_osds_on_block:
result = CephOSDInstance.remove_osd(
zkhandler, logger, osd["id"], force_flag=True, skip_zap_flag=skip_zap
)
return True
except Exception as e:
# Log it
logger.out("Failed to replace OSD {} disk: {}".format(osd_id, e), state="e")
return False
# Determine the weight of the OSD(s)
if weight is None:
weight = all_osds_on_block[0]["stats"]["weight"]
# Determine how many split OSD(s) to recreate
if len(all_osds_on_block) > 1 and all_osds_on_block[0]["is_split"]:
split_count = len(all_osds_on_block)
else:
split_count = None
# Determine if an ext_db should be readded
if ext_db_ratio is not None:
osd_db_ratio = ext_db_ratio
osd_db_size = None
elif ext_db_size is not None:
osd_db_ratio = None
osd_db_size = ext_db_size
elif all_osds_on_block[0]["db_device"]:
_, osd_db_size_bytes, _ = common.run_os_command(
f"blockdev --getsize64 {all_osds_on_block[0]['db_device']}"
)
osd_db_ratio = None
osd_db_size = f"{osd_db_size}B"
else:
osd_db_ratio = None
osd_db_size = None
# Create [a] new OSD[s], on the new block device
result = CephOSDInstance.add_osd(
zkhandler,
logger,
node,
new_device,
weight,
ext_db_ratio=osd_db_ratio,
ext_db_size=osd_db_size,
split_count=split_count,
)
return result
@staticmethod
def refresh_osd(zkhandler, logger, node, osd_id, device, ext_db_flag):
@ -863,7 +746,7 @@ class CephOSDInstance(object):
return False
@staticmethod
def remove_osd(zkhandler, logger, osd_id, osd_obj, force_flag):
def remove_osd(zkhandler, logger, osd_id, force_flag=False, skip_zap_flag=False):
logger.out("Removing OSD {}".format(osd_id), state="i")
try:
# Verify the OSD is present
@ -931,32 +814,33 @@ class CephOSDInstance(object):
raise Exception
time.sleep(2)
# 4. Determine the block devices
osd_vg = zkhandler.read(("osd.vg", osd_id))
osd_lv = zkhandler.read(("osd.lv", osd_id))
osd_lvm = f"/dev/{osd_vg}/{osd_lv}"
osd_device = None
if not skip_zap_flag:
# 4. Determine the block devices
osd_vg = zkhandler.read(("osd.vg", osd_id))
osd_lv = zkhandler.read(("osd.lv", osd_id))
osd_lvm = f"/dev/{osd_vg}/{osd_lv}"
osd_device = None
logger.out(
f"Getting disk info for OSD {osd_id} LV {osd_lvm}",
state="i",
)
retcode, stdout, stderr = common.run_os_command(
f"ceph-volume lvm list {osd_lvm}"
)
for line in stdout.split("\n"):
if "devices" in line:
osd_device = line.split()[-1]
logger.out(
f"Getting disk info for OSD {osd_id} LV {osd_lvm}",
state="i",
)
retcode, stdout, stderr = common.run_os_command(
f"ceph-volume lvm list {osd_lvm}"
)
for line in stdout.split("\n"):
if "devices" in line:
osd_device = line.split()[-1]
if not osd_device:
print("ceph-volume lvm list")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
if not osd_device:
print("ceph-volume lvm list")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
# 5. Purge the OSD from Ceph
logger.out("Purging OSD {osd_id}", state="i")
@ -976,15 +860,17 @@ class CephOSDInstance(object):
else:
raise Exception
# 7. Remove the DB device
if zkhandler.exists(("osd.db_device", osd_id)):
db_device = zkhandler.read(("osd.db_device", osd_id))
logger.out(
'Removing OSD DB logical volume "{}"'.format(db_device), state="i"
)
retcode, stdout, stderr = common.run_os_command(
"lvremove --yes --force {}".format(db_device)
)
if not skip_zap_flag:
# 7. Remove the DB device
if zkhandler.exists(("osd.db_device", osd_id)):
db_device = zkhandler.read(("osd.db_device", osd_id))
logger.out(
'Removing OSD DB logical volume "{}"'.format(db_device),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"lvremove --yes --force {}".format(db_device)
)
# 8. Delete OSD from ZK
logger.out("Deleting OSD {} from Zookeeper".format(osd_id), state="i")
@ -1307,8 +1193,19 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Replacing an OSD
if command == "osd_replace":
node, osd_id, old_device, new_device, weight, ext_db_flag = args.split(",")
ext_db_flag = bool(strtobool(ext_db_flag))
(
node,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
) = args.split(",")
old_device = None if old_device == "None" else old_device
weight = None if weight == "None" else weight
ext_db_ratio = None if ext_db_ratio == "None" else ext_db_ratio
ext_db_size = None if ext_db_size == "None" else ext_db_size
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock("base.cmd.ceph")
@ -1319,10 +1216,11 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
logger,
node,
osd_id,
old_device,
new_device,
old_device,
weight,
ext_db_flag,
ext_db_ratio,
ext_db_size,
)
# Command succeeded
if result:
@ -1373,7 +1271,7 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
with zk_lock:
# Remove the OSD
result = CephOSDInstance.remove_osd(
zkhandler, logger, osd_id, d_osd[osd_id], force_flag
zkhandler, logger, osd_id, force_flag
)
# Command succeeded
if result: