Add support for split OSD adds
Allows creating multiple OSDs on a single (NVMe) block device, leveraging the "ceph-volume lvm batch" command. Replaces the previous method of creating OSDs. Also adds a new ZK item for each OSD indicating if it is split or not.
This commit is contained in:
@ -26,6 +26,7 @@ import daemon_lib.common as common
|
||||
|
||||
from distutils.util import strtobool
|
||||
from re import search, match, sub
|
||||
from json import loads as jloads
|
||||
|
||||
|
||||
def get_detect_device(detect_string):
|
||||
@ -260,7 +261,15 @@ class CephOSDInstance(object):
|
||||
|
||||
@staticmethod
|
||||
def add_osd(
|
||||
zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05
|
||||
zkhandler,
|
||||
logger,
|
||||
node,
|
||||
device,
|
||||
weight,
|
||||
ext_db_flag=False,
|
||||
ext_db_ratio=0.05,
|
||||
split_device=False,
|
||||
split_count=1,
|
||||
):
|
||||
# Handle a detect device if that is passed
|
||||
if match(r"detect:", device):
|
||||
@ -278,177 +287,185 @@ class CephOSDInstance(object):
|
||||
)
|
||||
device = ddevice
|
||||
|
||||
# We are ready to create a new OSD on this node
|
||||
logger.out("Creating new OSD disk on block device {}".format(device), state="i")
|
||||
if split_device and split_count > 1:
|
||||
split_flag = f"--osds-per-device {split_count}"
|
||||
logger.out(
|
||||
f"Creating {split_count} new OSD disks on block device {device}",
|
||||
state="i",
|
||||
)
|
||||
else:
|
||||
split_flag = ""
|
||||
logger.out(f"Creating 1 new OSD disk on block device {device}", state="i")
|
||||
|
||||
if "nvme" in device:
|
||||
class_flag = "--crush-device-class nvme"
|
||||
else:
|
||||
class_flag = "--crush-device-class ssd"
|
||||
|
||||
try:
|
||||
# 1. Create an OSD; we do this so we know what ID will be gen'd
|
||||
retcode, stdout, stderr = common.run_os_command("ceph osd create")
|
||||
if retcode:
|
||||
print("ceph osd create")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
osd_id = stdout.rstrip()
|
||||
|
||||
# 2. Remove that newly-created OSD
|
||||
# 1. Zap the block device
|
||||
logger.out(f"Zapping disk {device}", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd rm {}".format(osd_id)
|
||||
f"ceph-volume lvm zap --destroy {device}"
|
||||
)
|
||||
if retcode:
|
||||
print("ceph osd rm")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
logger.out("Failed: ceph-volume lvm zap", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
# 3a. Zap the disk to ensure it is ready to go
|
||||
logger.out("Zapping disk {}".format(device), state="i")
|
||||
# 2. Prepare the OSD(s)
|
||||
logger.out(f"Preparing OSD(s) on disk {device}", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm zap --destroy {}".format(device)
|
||||
f"ceph-volume lvm batch --yes --prepare --bluestore {split_flag} {class_flag} {device}"
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm zap")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
logger.out("Failed: ceph-volume lvm batch", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
dev_flags = "--data {}".format(device)
|
||||
# 3. Get the list of created OSDs on the device (initial pass)
|
||||
logger.out(f"Querying OSD(s) on disk {device}", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph-volume lvm list --format json {device}"
|
||||
)
|
||||
if retcode:
|
||||
logger.out("Failed: ceph-volume lvm list", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
# 3b. Prepare the logical volume if ext_db_flag
|
||||
created_osds = jloads(stdout)
|
||||
|
||||
# 4. Prepare the WAL and DB devices
|
||||
if ext_db_flag:
|
||||
_, osd_size_bytes, _ = common.run_os_command(
|
||||
"blockdev --getsize64 {}".format(device)
|
||||
for created_osd in created_osds:
|
||||
# 4a. Get the OSD FSID and ID from the details
|
||||
osd_details = created_osds[created_osd][0]
|
||||
osd_fsid = osd_details["tags"]["ceph.osd_fsid"]
|
||||
osd_id = osd_details["tags"]["ceph.osd_id"]
|
||||
osd_lv = osd_details["lv_path"]
|
||||
|
||||
logger.out(
|
||||
f"Creating Bluestore DB volume for OSD {osd_id}", state="i"
|
||||
)
|
||||
|
||||
# 4b. Prepare the logical volume if ext_db_flag
|
||||
_, osd_size_bytes, _ = common.run_os_command(
|
||||
f"blockdev --getsize64 {osd_lv}"
|
||||
)
|
||||
osd_size_bytes = int(osd_size_bytes)
|
||||
result = CephOSDInstance.create_osd_db_lv(
|
||||
zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes
|
||||
)
|
||||
if not result:
|
||||
raise Exception
|
||||
db_device = "osd-db/osd-{}".format(osd_id)
|
||||
|
||||
# 4c. Attach the new DB device to the OSD
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}"
|
||||
)
|
||||
if retcode:
|
||||
logger.out("Failed: ceph-volume lvm new-db", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
# 4d. Get the list of created OSDs on the device (final pass)
|
||||
logger.out(f"(Requerying OSD(s) on disk {device}", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph-volume lvm list --format json {device}"
|
||||
)
|
||||
osd_size_bytes = int(osd_size_bytes)
|
||||
result = CephOSDInstance.create_osd_db_lv(
|
||||
zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes
|
||||
)
|
||||
if not result:
|
||||
if retcode:
|
||||
logger.out("Failed: ceph-volume lvm list", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
db_device = "osd-db/osd-{}".format(osd_id)
|
||||
dev_flags += " --block.db {}".format(db_device)
|
||||
else:
|
||||
db_device = ""
|
||||
|
||||
# 3c. Create the OSD for real
|
||||
logger.out(
|
||||
"Preparing LVM for new OSD disk with ID {} on {}".format(
|
||||
osd_id, device
|
||||
),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm prepare --bluestore {devices}".format(
|
||||
devices=dev_flags
|
||||
created_osds = jloads(stdout)
|
||||
|
||||
# 5. Activate the OSDs
|
||||
logger.out(f"Activating OSD(s) on disk {device}", state="i")
|
||||
for created_osd in created_osds:
|
||||
# 5a. Get the OSD FSID and ID from the details
|
||||
osd_details = created_osds[created_osd][0]
|
||||
osd_clusterfsid = osd_details["tags"]["ceph.cluster_fsid"]
|
||||
osd_fsid = osd_details["tags"]["ceph.osd_fsid"]
|
||||
osd_id = osd_details["tags"]["ceph.osd_id"]
|
||||
db_device = osd_details["tags"].get("ceph.db_device", None)
|
||||
osd_vg = osd_details["vg_name"]
|
||||
osd_lv = osd_details["lv_name"]
|
||||
|
||||
# 5b. Activate the OSD
|
||||
logger.out(f"Activating OSD {osd_id}", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}"
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm prepare")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
if retcode:
|
||||
logger.out("Failed: ceph-volume lvm activate", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
# 4a. Get OSD information
|
||||
logger.out(
|
||||
"Getting OSD information for ID {} on {}".format(osd_id, device),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm list {device}".format(device=device)
|
||||
)
|
||||
for line in stdout.split("\n"):
|
||||
if "block device" in line:
|
||||
osd_blockdev = line.split()[-1]
|
||||
if "osd fsid" in line:
|
||||
osd_fsid = line.split()[-1]
|
||||
if "cluster fsid" in line:
|
||||
osd_clusterfsid = line.split()[-1]
|
||||
if "devices" in line:
|
||||
osd_device = line.split()[-1]
|
||||
|
||||
if not osd_fsid:
|
||||
print("ceph-volume lvm list")
|
||||
print("Could not find OSD information in data:")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Split OSD blockdev into VG and LV components
|
||||
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
|
||||
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
|
||||
|
||||
# Reset whatever we were given to Ceph's /dev/xdX naming
|
||||
if device != osd_device:
|
||||
device = osd_device
|
||||
|
||||
# 4b. Activate the OSD
|
||||
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
|
||||
osdid=osd_id, osdfsid=osd_fsid
|
||||
# 5c. Add it to the crush map
|
||||
logger.out(f"Adding OSD {osd_id} to CRUSH map", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}"
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm activate")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
if retcode:
|
||||
logger.out("Failed: ceph osd crush add", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
# 5. Add it to the crush map
|
||||
logger.out(
|
||||
"Adding new OSD disk with ID {} to CRUSH map".format(osd_id), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd crush add osd.{osdid} {weight} root=default host={node}".format(
|
||||
osdid=osd_id, weight=weight, node=node
|
||||
# 5d. Wait half a second for it to activate
|
||||
time.sleep(0.5)
|
||||
|
||||
# 5e. Verify it started
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph osd crush add")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
if retcode:
|
||||
logger.out(f"Failed: OSD {osd_id} unit is not active", state="e")
|
||||
logger.out(stdout, state="d")
|
||||
logger.out(stderr, state="d")
|
||||
raise Exception
|
||||
|
||||
time.sleep(0.5)
|
||||
# 5f. Add the new OSD to PVC
|
||||
logger.out(f"Adding OSD {osd_id} to PVC", state="i")
|
||||
zkhandler.write(
|
||||
[
|
||||
(("osd", osd_id), ""),
|
||||
(("osd.node", osd_id), node),
|
||||
(("osd.device", osd_id), device),
|
||||
(("osd.db_device", osd_id), db_device),
|
||||
(("osd.fsid", osd_id), osd_fsid),
|
||||
(("osd.ofsid", osd_id), osd_fsid),
|
||||
(("osd.cfsid", osd_id), osd_clusterfsid),
|
||||
(("osd.lvm", osd_id), ""),
|
||||
(("osd.vg", osd_id), osd_vg),
|
||||
(("osd.lv", osd_id), osd_lv),
|
||||
(("osd.is_split", osd_id), split_flag),
|
||||
(
|
||||
("osd.stats", osd_id),
|
||||
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# 6. Verify it started
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print("systemctl status")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 7. Add the new OSD to the list
|
||||
# 6. Log it
|
||||
logger.out(
|
||||
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
|
||||
f"Successfully created {split_count} new OSD(s) {','.join(created_osds.keys())} on disk {device}",
|
||||
state="o",
|
||||
)
|
||||
zkhandler.write(
|
||||
[
|
||||
(("osd", osd_id), ""),
|
||||
(("osd.node", osd_id), node),
|
||||
(("osd.device", osd_id), device),
|
||||
(("osd.db_device", osd_id), db_device),
|
||||
(("osd.fsid", osd_id), ""),
|
||||
(("osd.ofsid", osd_id), osd_fsid),
|
||||
(("osd.cfsid", osd_id), osd_clusterfsid),
|
||||
(("osd.lvm", osd_id), ""),
|
||||
(("osd.vg", osd_id), osd_vg),
|
||||
(("osd.lv", osd_id), osd_lv),
|
||||
(
|
||||
("osd.stats", osd_id),
|
||||
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Log it
|
||||
logger.out("Created new OSD disk with ID {}".format(osd_id), state="o")
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out("Failed to create new OSD disk: {}".format(e), state="e")
|
||||
logger.out(
|
||||
f"Failed to create {split_count} new OSD(s) on disk {device}: {e}",
|
||||
state="e",
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
@ -828,7 +845,7 @@ class CephOSDInstance(object):
|
||||
|
||||
@staticmethod
|
||||
def remove_osd(zkhandler, logger, osd_id, osd_obj, force_flag):
|
||||
logger.out("Removing OSD disk {}".format(osd_id), state="i")
|
||||
logger.out("Removing OSD {}".format(osd_id), state="i")
|
||||
try:
|
||||
# Verify the OSD is present
|
||||
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
||||
@ -843,7 +860,7 @@ class CephOSDInstance(object):
|
||||
return True
|
||||
|
||||
# 1. Set the OSD down and out so it will flush
|
||||
logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i")
|
||||
logger.out("Setting down OSD {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd down {}".format(osd_id)
|
||||
)
|
||||
@ -856,7 +873,7 @@ class CephOSDInstance(object):
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
|
||||
logger.out("Setting out OSD {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd out {}".format(osd_id)
|
||||
)
|
||||
@ -881,7 +898,7 @@ class CephOSDInstance(object):
|
||||
time.sleep(5)
|
||||
|
||||
# 3. Stop the OSD process and wait for it to be terminated
|
||||
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
|
||||
logger.out("Stopping OSD {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl stop ceph-osd@{}".format(osd_id)
|
||||
)
|
||||
@ -922,25 +939,8 @@ class CephOSDInstance(object):
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
# 5. Zap the volumes
|
||||
logger.out(
|
||||
"Zapping OSD {} disk on {}".format(osd_id, osd_device),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm zap --destroy {}".format(osd_device)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm zap")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
if force_flag:
|
||||
logger.out("Ignoring error due to force flag", state="i")
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
# 6. Purge the OSD from Ceph
|
||||
logger.out("Purging OSD disk with ID {}".format(osd_id), state="i")
|
||||
# 5. Purge the OSD from Ceph
|
||||
logger.out("Purging OSD {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd purge {} --yes-i-really-mean-it".format(osd_id)
|
||||
)
|
||||
@ -964,19 +964,15 @@ class CephOSDInstance(object):
|
||||
)
|
||||
|
||||
# 8. Delete OSD from ZK
|
||||
logger.out(
|
||||
"Deleting OSD disk with ID {} from Zookeeper".format(osd_id), state="i"
|
||||
)
|
||||
logger.out("Deleting OSD {} from Zookeeper".format(osd_id), state="i")
|
||||
zkhandler.delete(("osd", osd_id), recursive=True)
|
||||
|
||||
# Log it
|
||||
logger.out("Removed OSD disk with ID {}".format(osd_id), state="o")
|
||||
logger.out("Successfully removed OSD {}".format(osd_id), state="o")
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out(
|
||||
"Failed to purge OSD disk with ID {}: {}".format(osd_id, e), state="e"
|
||||
)
|
||||
logger.out("Failed to remove OSD {}: {}".format(osd_id, e), state="e")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
@ -1245,16 +1241,34 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
|
||||
|
||||
# Adding a new OSD
|
||||
if command == "osd_add":
|
||||
node, device, weight, ext_db_flag, ext_db_ratio = args.split(",")
|
||||
(
|
||||
node,
|
||||
device,
|
||||
weight,
|
||||
ext_db_flag,
|
||||
ext_db_ratio,
|
||||
split_flag,
|
||||
split_count,
|
||||
) = args.split(",")
|
||||
ext_db_flag = bool(strtobool(ext_db_flag))
|
||||
ext_db_ratio = float(ext_db_ratio)
|
||||
split_flag = bool(strtobool(split_flag))
|
||||
split_count = int(split_count)
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
||||
with zk_lock:
|
||||
# Add the OSD
|
||||
result = CephOSDInstance.add_osd(
|
||||
zkhandler, logger, node, device, weight, ext_db_flag, ext_db_ratio
|
||||
zkhandler,
|
||||
logger,
|
||||
node,
|
||||
device,
|
||||
weight,
|
||||
ext_db_flag,
|
||||
ext_db_ratio,
|
||||
split_flag,
|
||||
split_count,
|
||||
)
|
||||
# Command succeeded
|
||||
if result:
|
||||
|
Reference in New Issue
Block a user