Add support for replacing/refreshing OSDs
Adds commands to both replace an OSD disk, and refresh (reimport) an existing OSD disk on a new node. This handles the cases where an OSD disk should be replaced (either due to upgrades or failures) or where a node is rebuilt in-place and an existing OSD must be re-imported to it. This should avoid the need to do a full remove/add sequence for either case. Also cleans up some aspects of OSD removal that are identical between methods (e.g. using safe-to-destroy and sleeping after stopping) and fixes a bug if an OSD does not truly exist when the daemon starts up.
This commit is contained in:
@ -21,7 +21,6 @@
|
||||
|
||||
import time
|
||||
import json
|
||||
import psutil
|
||||
|
||||
import daemon_lib.common as common
|
||||
|
||||
@ -217,6 +216,10 @@ class CephOSDInstance(object):
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph-volume lvm list {find_device}"
|
||||
)
|
||||
osd_blockdev = None
|
||||
osd_fsid = None
|
||||
osd_clusterfsid = None
|
||||
osd_device = None
|
||||
for line in stdout.split("\n"):
|
||||
if "block device" in line:
|
||||
osd_blockdev = line.split()[-1]
|
||||
@ -227,7 +230,7 @@ class CephOSDInstance(object):
|
||||
if "devices" in line:
|
||||
osd_device = line.split()[-1]
|
||||
|
||||
if not osd_fsid:
|
||||
if not osd_blockdev or not osd_fsid or not osd_clusterfsid or not osd_device:
|
||||
self.logger.out(
|
||||
f"Failed to find updated OSD information via ceph-volume for {find_device}",
|
||||
state="e",
|
||||
@ -404,6 +407,7 @@ class CephOSDInstance(object):
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# 6. Verify it started
|
||||
@ -447,6 +451,381 @@ class CephOSDInstance(object):
|
||||
logger.out("Failed to create new OSD disk: {}".format(e), state="e")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def replace_osd(
|
||||
zkhandler,
|
||||
logger,
|
||||
node,
|
||||
osd_id,
|
||||
old_device,
|
||||
new_device,
|
||||
weight,
|
||||
ext_db_flag=False,
|
||||
):
|
||||
# Handle a detect device if that is passed
|
||||
if match(r"detect:", new_device):
|
||||
ddevice = get_detect_device(new_device)
|
||||
if ddevice is None:
|
||||
logger.out(
|
||||
f"Failed to determine block device from detect string {new_device}",
|
||||
state="e",
|
||||
)
|
||||
return False
|
||||
else:
|
||||
logger.out(
|
||||
f"Determined block device {ddevice} from detect string {new_device}",
|
||||
state="i",
|
||||
)
|
||||
new_device = ddevice
|
||||
|
||||
# We are ready to create a new OSD on this node
|
||||
logger.out(
|
||||
"Replacing OSD {} disk with block device {}".format(osd_id, new_device),
|
||||
state="i",
|
||||
)
|
||||
try:
|
||||
# Verify the OSD is present
|
||||
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
||||
osd_list = stdout.split("\n")
|
||||
if osd_id not in osd_list:
|
||||
logger.out(
|
||||
"Could not find OSD {} in the cluster".format(osd_id), state="e"
|
||||
)
|
||||
return True
|
||||
|
||||
# 1. Set the OSD down and out so it will flush
|
||||
logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd down {}".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph osd down")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd out {}".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph osd out")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
|
||||
logger.out("Waiting for OSD {osd_id} to be safe to remove", state="i")
|
||||
while True:
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph osd safe-to-destroy osd.{osd_id}"
|
||||
)
|
||||
if retcode in [0, 11]:
|
||||
# Code 0 = success
|
||||
# Code 11 = "Error EAGAIN: OSD(s) 5 have no reported stats, and not all PGs are active+clean; we cannot draw any conclusions." which means all PGs have been remappped but backfill is still occurring
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
# 3. Stop the OSD process
|
||||
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl stop ceph-osd@{}".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print("systemctl stop")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
time.sleep(2)
|
||||
|
||||
# 4. Destroy the OSD
|
||||
logger.out("Destroying OSD with ID {osd_id}", state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph osd destroy {osd_id} --yes-i-really-mean-it"
|
||||
)
|
||||
if retcode:
|
||||
print("ceph osd destroy")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 5. Adjust the weight
|
||||
logger.out(
|
||||
"Adjusting weight of OSD disk with ID {} in CRUSH map".format(osd_id),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd crush reweight osd.{osdid} {weight}".format(
|
||||
osdid=osd_id, weight=weight
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph osd crush reweight")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 6a. Zap the new disk to ensure it is ready to go
|
||||
logger.out("Zapping disk {}".format(new_device), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm zap --destroy {}".format(new_device)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm zap")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
dev_flags = "--data {}".format(new_device)
|
||||
|
||||
# 6b. Prepare the logical volume if ext_db_flag
|
||||
if ext_db_flag:
|
||||
db_device = "osd-db/osd-{}".format(osd_id)
|
||||
dev_flags += " --block.db {}".format(db_device)
|
||||
else:
|
||||
db_device = ""
|
||||
|
||||
# 6c. Replace the OSD
|
||||
logger.out(
|
||||
"Preparing LVM for replaced OSD {} disk on {}".format(
|
||||
osd_id, new_device
|
||||
),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm prepare --osd-id {osdid} --bluestore {devices}".format(
|
||||
osdid=osd_id, devices=dev_flags
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm prepare")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 7a. Get OSD information
|
||||
logger.out(
|
||||
"Getting OSD information for ID {} on {}".format(osd_id, new_device),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm list {device}".format(device=new_device)
|
||||
)
|
||||
for line in stdout.split("\n"):
|
||||
if "block device" in line:
|
||||
osd_blockdev = line.split()[-1]
|
||||
if "osd fsid" in line:
|
||||
osd_fsid = line.split()[-1]
|
||||
if "cluster fsid" in line:
|
||||
osd_clusterfsid = line.split()[-1]
|
||||
if "devices" in line:
|
||||
osd_device = line.split()[-1]
|
||||
|
||||
if not osd_fsid:
|
||||
print("ceph-volume lvm list")
|
||||
print("Could not find OSD information in data:")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Split OSD blockdev into VG and LV components
|
||||
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
|
||||
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
|
||||
|
||||
# Reset whatever we were given to Ceph's /dev/xdX naming
|
||||
if new_device != osd_device:
|
||||
new_device = osd_device
|
||||
|
||||
# 7b. Activate the OSD
|
||||
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
|
||||
osdid=osd_id, osdfsid=osd_fsid
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm activate")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# 8. Verify it started
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print("systemctl status")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 9. Update Zookeeper information
|
||||
logger.out(
|
||||
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
|
||||
)
|
||||
zkhandler.write(
|
||||
[
|
||||
(("osd", osd_id), ""),
|
||||
(("osd.node", osd_id), node),
|
||||
(("osd.device", osd_id), new_device),
|
||||
(("osd.db_device", osd_id), db_device),
|
||||
(("osd.fsid", osd_id), ""),
|
||||
(("osd.ofsid", osd_id), osd_fsid),
|
||||
(("osd.cfsid", osd_id), osd_clusterfsid),
|
||||
(("osd.lvm", osd_id), ""),
|
||||
(("osd.vg", osd_id), osd_vg),
|
||||
(("osd.lv", osd_id), osd_lv),
|
||||
(
|
||||
("osd.stats", osd_id),
|
||||
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Log it
|
||||
logger.out(
|
||||
"Replaced OSD {} disk with device {}".format(osd_id, new_device),
|
||||
state="o",
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out("Failed to replace OSD {} disk: {}".format(osd_id, e), state="e")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def refresh_osd(zkhandler, logger, node, osd_id, device, ext_db_flag):
|
||||
# Handle a detect device if that is passed
|
||||
if match(r"detect:", device):
|
||||
ddevice = get_detect_device(device)
|
||||
if ddevice is None:
|
||||
logger.out(
|
||||
f"Failed to determine block device from detect string {device}",
|
||||
state="e",
|
||||
)
|
||||
return False
|
||||
else:
|
||||
logger.out(
|
||||
f"Determined block device {ddevice} from detect string {device}",
|
||||
state="i",
|
||||
)
|
||||
device = ddevice
|
||||
|
||||
# We are ready to create a new OSD on this node
|
||||
logger.out(
|
||||
"Refreshing OSD {} disk on block device {}".format(osd_id, device),
|
||||
state="i",
|
||||
)
|
||||
try:
|
||||
# 1. Verify the OSD is present
|
||||
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
||||
osd_list = stdout.split("\n")
|
||||
if osd_id not in osd_list:
|
||||
logger.out(
|
||||
"Could not find OSD {} in the cluster".format(osd_id), state="e"
|
||||
)
|
||||
return True
|
||||
|
||||
dev_flags = "--data {}".format(device)
|
||||
|
||||
if ext_db_flag:
|
||||
db_device = "osd-db/osd-{}".format(osd_id)
|
||||
dev_flags += " --block.db {}".format(db_device)
|
||||
else:
|
||||
db_device = ""
|
||||
|
||||
# 2. Get OSD information
|
||||
logger.out(
|
||||
"Getting OSD information for ID {} on {}".format(osd_id, device),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm list {device}".format(device=device)
|
||||
)
|
||||
for line in stdout.split("\n"):
|
||||
if "block device" in line:
|
||||
osd_blockdev = line.split()[-1]
|
||||
if "osd fsid" in line:
|
||||
osd_fsid = line.split()[-1]
|
||||
if "cluster fsid" in line:
|
||||
osd_clusterfsid = line.split()[-1]
|
||||
if "devices" in line:
|
||||
osd_device = line.split()[-1]
|
||||
|
||||
if not osd_fsid:
|
||||
print("ceph-volume lvm list")
|
||||
print("Could not find OSD information in data:")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Split OSD blockdev into VG and LV components
|
||||
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
|
||||
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
|
||||
|
||||
# Reset whatever we were given to Ceph's /dev/xdX naming
|
||||
if device != osd_device:
|
||||
device = osd_device
|
||||
|
||||
# 3. Activate the OSD
|
||||
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
|
||||
osdid=osd_id, osdfsid=osd_fsid
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print("ceph-volume lvm activate")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# 4. Verify it started
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print("systemctl status")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 5. Update Zookeeper information
|
||||
logger.out(
|
||||
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
|
||||
)
|
||||
zkhandler.write(
|
||||
[
|
||||
(("osd", osd_id), ""),
|
||||
(("osd.node", osd_id), node),
|
||||
(("osd.device", osd_id), device),
|
||||
(("osd.db_device", osd_id), db_device),
|
||||
(("osd.fsid", osd_id), ""),
|
||||
(("osd.ofsid", osd_id), osd_fsid),
|
||||
(("osd.cfsid", osd_id), osd_clusterfsid),
|
||||
(("osd.lvm", osd_id), ""),
|
||||
(("osd.vg", osd_id), osd_vg),
|
||||
(("osd.lv", osd_id), osd_lv),
|
||||
(
|
||||
("osd.stats", osd_id),
|
||||
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Log it
|
||||
logger.out("Refreshed OSD {} disk on {}".format(osd_id, device), state="o")
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out("Failed to refresh OSD {} disk: {}".format(osd_id, e), state="e")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def remove_osd(zkhandler, logger, osd_id, osd_obj, force_flag):
|
||||
logger.out("Removing OSD disk {}".format(osd_id), state="i")
|
||||
@ -490,29 +869,16 @@ class CephOSDInstance(object):
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
# 2. Wait for the OSD to flush
|
||||
logger.out("Flushing OSD disk with ID {}".format(osd_id), state="i")
|
||||
osd_string = str()
|
||||
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
|
||||
logger.out("Waiting for OSD {osd_id} to be safe to remove", state="i")
|
||||
while True:
|
||||
try:
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph pg dump osds --format json"
|
||||
)
|
||||
dump_string = json.loads(stdout)
|
||||
for osd in dump_string:
|
||||
if str(osd["osd"]) == osd_id:
|
||||
osd_string = osd
|
||||
num_pgs = osd_string["num_pgs"]
|
||||
if num_pgs > 0:
|
||||
time.sleep(5)
|
||||
else:
|
||||
if force_flag:
|
||||
logger.out("Ignoring error due to force flag", state="i")
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
except Exception:
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ceph osd safe-to-destroy osd.{osd_id}"
|
||||
)
|
||||
if int(retcode) in [0, 11]:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
# 3. Stop the OSD process and wait for it to be terminated
|
||||
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
|
||||
@ -527,19 +893,7 @@ class CephOSDInstance(object):
|
||||
logger.out("Ignoring error due to force flag", state="i")
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
# FIXME: There has to be a better way to do this /shrug
|
||||
while True:
|
||||
is_osd_up = False
|
||||
# Find if there is a process named ceph-osd with arg '--id {id}'
|
||||
for p in psutil.process_iter(attrs=["name", "cmdline"]):
|
||||
if "ceph-osd" == p.info["name"] and "--id {}".format(
|
||||
osd_id
|
||||
) in " ".join(p.info["cmdline"]):
|
||||
is_osd_up = True
|
||||
# If there isn't, continue
|
||||
if not is_osd_up:
|
||||
break
|
||||
time.sleep(2)
|
||||
|
||||
# 4. Determine the block devices
|
||||
osd_vg = zkhandler.read(("osd.vg", osd_id))
|
||||
@ -912,6 +1266,59 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
||||
# Replacing an OSD
|
||||
if command == "osd_replace":
|
||||
node, osd_id, old_device, new_device, weight, ext_db_flag = args.split(",")
|
||||
ext_db_flag = bool(strtobool(ext_db_flag))
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
||||
with zk_lock:
|
||||
# Add the OSD
|
||||
result = CephOSDInstance.replace_osd(
|
||||
zkhandler,
|
||||
logger,
|
||||
node,
|
||||
osd_id,
|
||||
old_device,
|
||||
new_device,
|
||||
weight,
|
||||
ext_db_flag,
|
||||
)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
||||
# Refreshing an OSD
|
||||
if command == "osd_refresh":
|
||||
node, osd_id, device, ext_db_flag = args.split(",")
|
||||
ext_db_flag = bool(strtobool(ext_db_flag))
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
||||
with zk_lock:
|
||||
# Add the OSD
|
||||
result = CephOSDInstance.refresh_osd(
|
||||
zkhandler, logger, node, osd_id, device, ext_db_flag
|
||||
)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
||||
# Removing an OSD
|
||||
elif command == "osd_remove":
|
||||
osd_id, force = args.split(",")
|
||||
|
Reference in New Issue
Block a user