Files
pvc/daemon-common/ceph.py
Joshua M. Boniface a0b45a2bcd Always create RBDs with bytes value
Converting into human results in imprecise values when specifying bytes
directly, which in turn breaks VMDK image uploads. Instead, just use the
raw bytes value when creating the volume instead of converting it back.
2023-09-30 12:37:43 -04:00

1288 lines
41 KiB
Python

#!/usr/bin/env python3
# ceph.py - PVC client function library, Ceph cluster fuctions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import re
import json
import time
import math
from concurrent.futures import ThreadPoolExecutor
import daemon_lib.vm as vm
import daemon_lib.common as common
#
# Supplemental functions
#
# Verify OSD is valid in cluster
def verifyOSD(zkhandler, osd_id):
return zkhandler.exists(("osd", osd_id))
# Verify Pool is valid in cluster
def verifyPool(zkhandler, name):
return zkhandler.exists(("pool", name))
# Verify Volume is valid in cluster
def verifyVolume(zkhandler, pool, name):
return zkhandler.exists(("volume", f"{pool}/{name}"))
# Verify Snapshot is valid in cluster
def verifySnapshot(zkhandler, pool, volume, name):
return zkhandler.exists(("snapshot", f"{pool}/{volume}/{name}"))
# Verify OSD path is valid in cluster
def verifyOSDBlock(zkhandler, node, device):
for osd in zkhandler.children("base.osd"):
osd_node = zkhandler.read(("osd.node", osd))
osd_device = zkhandler.read(("osd.device", osd))
if node == osd_node and device == osd_device:
return osd
return None
# Matrix of human-to-byte values
byte_unit_matrix = {
"B": 1,
"K": 1024,
"M": 1024 * 1024,
"G": 1024 * 1024 * 1024,
"T": 1024 * 1024 * 1024 * 1024,
"P": 1024 * 1024 * 1024 * 1024 * 1024,
"E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"Z": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"Y": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"R": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"Q": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
}
# Matrix of human-to-metric values
ops_unit_matrix = {
"": 1,
"K": 1000,
"M": 1000 * 1000,
"G": 1000 * 1000 * 1000,
"T": 1000 * 1000 * 1000 * 1000,
"P": 1000 * 1000 * 1000 * 1000 * 1000,
"E": 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"Z": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"Y": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"R": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"Q": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
}
# Format byte sizes to/from human-readable units
def format_bytes_tohuman(databytes):
datahuman = ""
for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get, reverse=True):
new_bytes = int(math.ceil(databytes / byte_unit_matrix[unit]))
# Round up if 5 or more digits
if new_bytes > 9999:
# We can jump down another level
continue
else:
# We're at the end, display with this size
datahuman = "{}{}".format(new_bytes, unit)
return datahuman
def format_bytes_fromhuman(datahuman):
if not re.search(r"[A-Za-z]+", datahuman):
dataunit = "B"
datasize = int(datahuman)
else:
dataunit = str(re.match(r"[0-9]+([A-Za-z])[iBb]*", datahuman).group(1))
datasize = int(re.match(r"([0-9]+)[A-Za-z]+", datahuman).group(1))
if byte_unit_matrix.get(dataunit):
databytes = datasize * byte_unit_matrix[dataunit]
return databytes
else:
return None
# Format ops sizes to/from human-readable units
def format_ops_tohuman(dataops):
datahuman = ""
for unit in sorted(ops_unit_matrix, key=ops_unit_matrix.get, reverse=True):
new_ops = int(math.ceil(dataops / ops_unit_matrix[unit]))
# Round up if 5 or more digits
if new_ops > 9999:
# We can jump down another level
continue
else:
# We're at the end, display with this size
datahuman = "{}{}".format(new_ops, unit)
return datahuman
def format_ops_fromhuman(datahuman):
# Trim off human-readable character
dataunit = datahuman[-1]
datasize = int(datahuman[:-1])
dataops = datasize * ops_unit_matrix[dataunit]
return "{}".format(dataops)
def format_pct_tohuman(datapct):
datahuman = "{0:.1f}".format(float(datapct * 100.0))
return datahuman
#
# Status functions
#
def get_status(zkhandler):
primary_node = zkhandler.read("base.config.primary_node")
ceph_status = zkhandler.read("base.storage").rstrip()
# Create a data structure for the information
status_data = {
"type": "status",
"primary_node": primary_node,
"ceph_data": ceph_status,
}
return True, status_data
def get_health(zkhandler):
primary_node = zkhandler.read("base.config.primary_node")
ceph_health = zkhandler.read("base.storage.health").rstrip()
# Create a data structure for the information
status_data = {
"type": "health",
"primary_node": primary_node,
"ceph_data": ceph_health,
}
return True, status_data
def get_util(zkhandler):
primary_node = zkhandler.read("base.config.primary_node")
ceph_df = zkhandler.read("base.storage.util").rstrip()
# Create a data structure for the information
status_data = {
"type": "utilization",
"primary_node": primary_node,
"ceph_data": ceph_df,
}
return True, status_data
#
# OSD functions
#
def getClusterOSDList(zkhandler):
# Get a list of VNIs by listing the children of /networks
return zkhandler.children("base.osd")
def getOSDInformation(zkhandler, osd_id):
# Get the devices
osd_node = zkhandler.read(("osd.node", osd_id))
osd_device = zkhandler.read(("osd.device", osd_id))
osd_db_device = zkhandler.read(("osd.db_device", osd_id))
# Parse the stats data
osd_stats_raw = zkhandler.read(("osd.stats", osd_id))
osd_stats = dict(json.loads(osd_stats_raw))
osd_information = {
"id": osd_id,
"node": osd_node,
"device": osd_device,
"db_device": osd_db_device,
"stats": osd_stats,
}
return osd_information
# OSD DB VG actions use the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd_db_vg(zkhandler, node, device):
# Verify the target node exists
if not common.verifyNode(zkhandler, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(
node
)
# Tell the cluster to create a new OSD for the host
add_osd_db_vg_string = "db_vg_add {},{}".format(node, device)
zkhandler.write([("base.cmd.ceph", add_osd_db_vg_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-db_vg_add":
message = 'Created new OSD database VG at "{}" on node "{}".'.format(
device, node
)
success = True
else:
message = "ERROR: Failed to create new OSD database VG; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
# OSD actions use the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
# Verify the target node exists
if not common.verifyNode(zkhandler, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(
node
)
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, device)
if block_osd:
return (
False,
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
device, node, block_osd
),
)
# Tell the cluster to create a new OSD for the host
add_osd_string = "osd_add {},{},{},{},{}".format(
node, device, weight, ext_db_flag, ext_db_ratio
)
zkhandler.write([("base.cmd.ceph", add_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_add":
message = 'Created new OSD with block device "{}" on node "{}".'.format(
device, node
)
success = True
else:
message = (
"ERROR: Failed to create new OSD; check node logs for details."
)
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def replace_osd(zkhandler, osd_id, new_device, weight):
# Get current OSD information
osd_information = getOSDInformation(zkhandler, osd_id)
node = osd_information["node"]
old_device = osd_information["device"]
ext_db_flag = True if osd_information["db_device"] else False
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, new_device)
if block_osd and block_osd != osd_id:
return (
False,
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
new_device, node, block_osd
),
)
# Tell the cluster to create a new OSD for the host
replace_osd_string = "osd_replace {},{},{},{},{},{}".format(
node, osd_id, old_device, new_device, weight, ext_db_flag
)
zkhandler.write([("base.cmd.ceph", replace_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_replace":
message = 'Replaced OSD {} with block device "{}" on node "{}".'.format(
osd_id, new_device, node
)
success = True
else:
message = "ERROR: Failed to replace OSD; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def refresh_osd(zkhandler, osd_id, device):
# Get current OSD information
osd_information = getOSDInformation(zkhandler, osd_id)
node = osd_information["node"]
ext_db_flag = True if osd_information["db_device"] else False
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, device)
if not block_osd or block_osd != osd_id:
return (
False,
'ERROR: Block device "{}" on node "{}" is not used by OSD "{}"; use replace instead'.format(
device, node, osd_id
),
)
# Tell the cluster to create a new OSD for the host
refresh_osd_string = "osd_refresh {},{},{},{}".format(
node, osd_id, device, ext_db_flag
)
zkhandler.write([("base.cmd.ceph", refresh_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_refresh":
message = (
'Refreshed OSD {} with block device "{}" on node "{}".'.format(
osd_id, device, node
)
)
success = True
else:
message = "ERROR: Failed to refresh OSD; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def remove_osd(zkhandler, osd_id, force_flag):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
# Tell the cluster to remove an OSD
remove_osd_string = "osd_remove {},{}".format(osd_id, str(force_flag))
zkhandler.write([("base.cmd.ceph", remove_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_remove":
message = 'Removed OSD "{}" from the cluster.'.format(osd_id)
success = True
else:
message = "ERROR: Failed to remove OSD; check node logs for details."
success = False
except Exception:
success = False
message = "ERROR Command ignored by node."
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def in_osd(zkhandler, osd_id):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
retcode, stdout, stderr = common.run_os_command("ceph osd in {}".format(osd_id))
if retcode:
return False, "ERROR: Failed to enable OSD {}: {}".format(osd_id, stderr)
return True, "Set OSD {} online.".format(osd_id)
def out_osd(zkhandler, osd_id):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
retcode, stdout, stderr = common.run_os_command("ceph osd out {}".format(osd_id))
if retcode:
return False, "ERROR: Failed to disable OSD {}: {}".format(osd_id, stderr)
return True, "Set OSD {} offline.".format(osd_id)
def set_osd(zkhandler, option):
retcode, stdout, stderr = common.run_os_command("ceph osd set {}".format(option))
if retcode:
return False, 'ERROR: Failed to set property "{}": {}'.format(option, stderr)
return True, 'Set OSD property "{}".'.format(option)
def unset_osd(zkhandler, option):
retcode, stdout, stderr = common.run_os_command("ceph osd unset {}".format(option))
if retcode:
return False, 'ERROR: Failed to unset property "{}": {}'.format(option, stderr)
return True, 'Unset OSD property "{}".'.format(option)
def get_list_osd(zkhandler, limit, is_fuzzy=True):
osd_list = []
full_osd_list = zkhandler.children("base.osd")
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for osd in full_osd_list:
if limit:
try:
if re.fullmatch(limit, osd):
osd_list.append(getOSDInformation(zkhandler, osd))
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
osd_list.append(getOSDInformation(zkhandler, osd))
return True, sorted(osd_list, key=lambda x: int(x["id"]))
#
# Pool functions
#
def getPoolInformation(zkhandler, pool):
# Parse the stats data
pool_stats_raw = zkhandler.read(("pool.stats", pool))
pool_stats = dict(json.loads(pool_stats_raw))
volume_count = len(getCephVolumes(zkhandler, pool))
tier = zkhandler.read(("pool.tier", pool))
if tier is None:
tier = "default"
pgs = zkhandler.read(("pool.pgs", pool))
pool_information = {
"name": pool,
"volume_count": volume_count,
"tier": tier,
"pgs": pgs,
"stats": pool_stats,
}
return pool_information
def add_pool(zkhandler, name, pgs, replcfg, tier=None):
# Prepare the copies/mincopies variables
try:
copies, mincopies = replcfg.split(",")
copies = int(copies.replace("copies=", ""))
mincopies = int(mincopies.replace("mincopies=", ""))
except Exception:
copies = None
mincopies = None
if not copies or not mincopies:
return False, f'ERROR: Replication configuration "{replcfg}" is not valid.'
# Prepare the tiers if applicable
if tier is not None and tier in ["hdd", "ssd", "nvme"]:
crush_rule = f"{tier}_tier"
# Create a CRUSH rule for the relevant tier
retcode, stdout, stderr = common.run_os_command(
f"ceph osd crush rule create-replicated {crush_rule} default host {tier}"
)
if retcode:
return (
False,
f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}",
)
else:
tier = "default"
crush_rule = "replicated"
# Create the pool
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}"
)
if retcode:
return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}'
# Set the size and minsize
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} size {copies}"
)
if retcode:
return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}'
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} min_size {mincopies}"
)
if retcode:
return (
False,
f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}',
)
# Enable RBD application
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool application enable {name} rbd"
)
if retcode:
return (
False,
f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}',
)
# Add the new pool to Zookeeper
zkhandler.write(
[
(("pool", name), ""),
(("pool.pgs", name), pgs),
(("pool.tier", name), tier),
(("pool.stats", name), "{}"),
(("volume", name), ""),
(("snapshot", name), ""),
]
)
return True, f'Created RBD pool "{name}" with {pgs} PGs'
def remove_pool(zkhandler, name):
if not verifyPool(zkhandler, name):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
name
)
# 1. Remove pool volumes
for volume in zkhandler.children(("volume", name)):
remove_volume(zkhandler, name, volume)
# 2. Remove the pool
retcode, stdout, stderr = common.run_os_command(
"ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it".format(pool=name)
)
if retcode:
return False, 'ERROR: Failed to remove pool "{}": {}'.format(name, stderr)
# 3. Delete pool from Zookeeper
zkhandler.delete(
[
("pool", name),
("volume", name),
("snapshot", name),
]
)
return True, 'Removed RBD pool "{}" and all volumes.'.format(name)
def set_pgs_pool(zkhandler, name, pgs):
if not verifyPool(zkhandler, name):
return False, f'ERROR: No pool with name "{name}" is present in the cluster.'
# Validate new PGs count
pgs = int(pgs)
if (pgs == 0) or (pgs & (pgs - 1) != 0):
return (
False,
f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.',
)
# Set the new pgs number
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} pg_num {pgs}"
)
if retcode:
return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}"
# Set the new pgps number if increasing
current_pgs = int(zkhandler.read(("pool.pgs", name)))
if current_pgs >= pgs:
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} pgp_num {pgs}"
)
if retcode:
return (
False,
f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}",
)
# Update Zookeeper count
zkhandler.write(
[
(("pool.pgs", name), pgs),
]
)
return True, f'Set PGs count to {pgs} for RBD pool "{name}".'
def get_list_pool(zkhandler, limit, is_fuzzy=True):
full_pool_list = zkhandler.children("base.pool")
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
get_pool_info = dict()
for pool in full_pool_list:
is_limit_match = False
if limit:
try:
if re.fullmatch(limit, pool):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
is_limit_match = True
get_pool_info[pool] = True if is_limit_match else False
pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]]
pool_data_list = list()
with ThreadPoolExecutor(max_workers=32, thread_name_prefix="pool_list") as executor:
futures = []
for pool in pool_execute_list:
futures.append(executor.submit(getPoolInformation, zkhandler, pool))
for future in futures:
pool_data_list.append(future.result())
return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0)))
#
# Volume functions
#
def getCephVolumes(zkhandler, pool):
volume_list = list()
if not pool:
pool_list = zkhandler.children("base.pool")
else:
pool_list = [pool]
for pool_name in pool_list:
for volume_name in zkhandler.children(("volume", pool_name)):
volume_list.append("{}/{}".format(pool_name, volume_name))
return volume_list
def getVolumeInformation(zkhandler, pool, volume):
# Parse the stats data
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
volume_stats = dict(json.loads(volume_stats_raw))
# Format the size to something nicer
volume_stats["size"] = format_bytes_tohuman(volume_stats["size"])
volume_information = {"name": volume, "pool": pool, "stats": volume_stats}
return volume_information
def add_volume(zkhandler, pool, name, size):
# 1. Verify the size of the volume
pool_information = getPoolInformation(zkhandler, pool)
size_bytes = format_bytes_fromhuman(size)
if size_bytes is None:
return (
False,
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
)
if size_bytes >= int(pool_information["stats"]["free_bytes"]):
return (
False,
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
)
# 2. Create the volume
retcode, stdout, stderr = common.run_os_command(
"rbd create --size {}B {}/{}".format(size_bytes, pool, name)
)
if retcode:
return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(name, stderr)
# 2. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name}"), ""),
(("volume.stats", f"{pool}/{name}"), volstats),
(("snapshot", f"{pool}/{name}"), ""),
]
)
return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format(
name, format_bytes_tohuman(size_bytes), pool
)
def clone_volume(zkhandler, pool, name_src, name_new):
if not verifyVolume(zkhandler, pool, name_src):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name_src, pool
)
# 1. Clone the volume
retcode, stdout, stderr = common.run_os_command(
"rbd copy {}/{} {}/{}".format(pool, name_src, pool, name_new)
)
if retcode:
return (
False,
'ERROR: Failed to clone RBD volume "{}" to "{}" in pool "{}": {}'.format(
name_src, name_new, pool, stderr
),
)
# 2. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name_new)
)
volstats = stdout
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name_new}"), ""),
(("volume.stats", f"{pool}/{name_new}"), volstats),
(("snapshot", f"{pool}/{name_new}"), ""),
]
)
return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format(
name_src, name_new, pool
)
def resize_volume(zkhandler, pool, name, size):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Verify the size of the volume
pool_information = getPoolInformation(zkhandler, pool)
size_bytes = format_bytes_fromhuman(size)
if size_bytes is None:
return (
False,
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
)
if size_bytes >= int(pool_information["stats"]["free_bytes"]):
return (
False,
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
)
# 2. Resize the volume
retcode, stdout, stderr = common.run_os_command(
"rbd resize --size {} {}/{}".format(
format_bytes_tohuman(size_bytes), pool, name
)
)
if retcode:
return (
False,
'ERROR: Failed to resize RBD volume "{}" to size "{}" in pool "{}": {}'.format(
name, format_bytes_tohuman(size_bytes), pool, stderr
),
)
# 3a. Determine the node running this VM if applicable
active_node = None
volume_vm_name = name.split("_")[0]
retcode, vm_info = vm.get_info(zkhandler, volume_vm_name)
if retcode:
for disk in vm_info["disks"]:
# This block device is present in this VM so we can continue
if disk["name"] == "{}/{}".format(pool, name):
active_node = vm_info["node"]
volume_id = disk["dev"]
# 3b. Perform a live resize in libvirt if the VM is running
if active_node is not None and vm_info.get("state", "") == "start":
import libvirt
# Run the libvirt command against the target host
try:
dest_lv = "qemu+tcp://{}/system".format(active_node)
target_lv_conn = libvirt.open(dest_lv)
target_vm_conn = target_lv_conn.lookupByName(vm_info["name"])
if target_vm_conn:
target_vm_conn.blockResize(
volume_id,
size_bytes,
libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES,
)
target_lv_conn.close()
except Exception:
pass
# 4. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# 5. Update the volume in Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name}"), ""),
(("volume.stats", f"{pool}/{name}"), volstats),
(("snapshot", f"{pool}/{name}"), ""),
]
)
return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format(
name, format_bytes_tohuman(size_bytes), pool
)
def rename_volume(zkhandler, pool, name, new_name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Rename the volume
retcode, stdout, stderr = common.run_os_command(
"rbd rename {}/{} {}".format(pool, name, new_name)
)
if retcode:
return (
False,
'ERROR: Failed to rename volume "{}" to "{}" in pool "{}": {}'.format(
name, new_name, pool, stderr
),
)
# 2. Rename the volume in Zookeeper
zkhandler.rename(
[
(("volume", f"{pool}/{name}"), ("volume", f"{pool}/{new_name}")),
(("snapshot", f"{pool}/{name}"), ("snapshot", f"{pool}/{new_name}")),
]
)
# 3. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, new_name)
)
volstats = stdout
# 4. Update the volume stats in Zookeeper
zkhandler.write(
[
(("volume.stats", f"{pool}/{new_name}"), volstats),
]
)
return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format(
name, new_name, pool
)
def remove_volume(zkhandler, pool, name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Remove volume snapshots
for snapshot in zkhandler.children(("snapshot", f"{pool}/{name}")):
remove_snapshot(zkhandler, pool, name, snapshot)
# 2. Remove the volume
retcode, stdout, stderr = common.run_os_command("rbd rm {}/{}".format(pool, name))
if retcode:
return False, 'ERROR: Failed to remove RBD volume "{}" in pool "{}": {}'.format(
name, pool, stderr
)
# 3. Delete volume from Zookeeper
zkhandler.delete(
[
("volume", f"{pool}/{name}"),
("snapshot", f"{pool}/{name}"),
]
)
return True, 'Removed RBD volume "{}" in pool "{}".'.format(name, pool)
def map_volume(zkhandler, pool, name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Map the volume onto the local system
retcode, stdout, stderr = common.run_os_command("rbd map {}/{}".format(pool, name))
if retcode:
return False, 'ERROR: Failed to map RBD volume "{}" in pool "{}": {}'.format(
name, pool, stderr
)
# 2. Calculate the absolute path to the mapped volume
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
# 3. Ensure the volume exists
if not os.path.exists(mapped_volume):
return (
False,
'ERROR: Mapped volume not found at expected location "{}".'.format(
mapped_volume
),
)
return True, mapped_volume
def unmap_volume(zkhandler, pool, name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
# 1. Ensure the volume exists
if not os.path.exists(mapped_volume):
return (
False,
'ERROR: Mapped volume not found at expected location "{}".'.format(
mapped_volume
),
)
# 2. Unap the volume
retcode, stdout, stderr = common.run_os_command(
"rbd unmap {}".format(mapped_volume)
)
if retcode:
return False, 'ERROR: Failed to unmap RBD volume at "{}": {}'.format(
mapped_volume, stderr
)
return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume)
def get_list_volume(zkhandler, pool, limit, is_fuzzy=True):
if pool and not verifyPool(zkhandler, pool):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
pool
)
full_volume_list = getCephVolumes(zkhandler, pool)
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
get_volume_info = dict()
for volume in full_volume_list:
pool_name, volume_name = volume.split("/")
is_limit_match = False
# Check on limit
if limit:
# Try to match the limit against the volume name
try:
if re.fullmatch(limit, volume_name):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
is_limit_match = True
get_volume_info[volume] = True if is_limit_match else False
# Obtain our volume data in a thread pool
volume_execute_list = [
volume for volume in full_volume_list if get_volume_info[volume]
]
volume_data_list = list()
with ThreadPoolExecutor(
max_workers=32, thread_name_prefix="volume_list"
) as executor:
futures = []
for volume in volume_execute_list:
pool_name, volume_name = volume.split("/")
futures.append(
executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name)
)
for future in futures:
volume_data_list.append(future.result())
return True, sorted(volume_data_list, key=lambda x: str(x["name"]))
#
# Snapshot functions
#
def getCephSnapshots(zkhandler, pool, volume):
snapshot_list = list()
volume_list = list()
volume_list = getCephVolumes(zkhandler, pool)
if volume:
for volume_entry in volume_list:
volume_pool, volume_name = volume_entry.split("/")
if volume_name == volume:
volume_list = ["{}/{}".format(volume_pool, volume_name)]
for volume_entry in volume_list:
for snapshot_name in zkhandler.children(("snapshot", volume_entry)):
snapshot_list.append("{}@{}".format(volume_entry, snapshot_name))
return snapshot_list
def add_snapshot(zkhandler, pool, volume, name):
if not verifyVolume(zkhandler, pool, volume):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
volume, pool
)
# 1. Create the snapshot
retcode, stdout, stderr = common.run_os_command(
"rbd snap create {}/{}@{}".format(pool, volume, name)
)
if retcode:
return (
False,
'ERROR: Failed to create RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
name, volume, pool, stderr
),
)
# 2. Add the snapshot to Zookeeper
zkhandler.write(
[
(("snapshot", f"{pool}/{volume}/{name}"), ""),
(("snapshot.stats", f"{pool}/{volume}/{name}"), "{}"),
]
)
# 3. Update the count of snapshots on this volume
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
volume_stats = dict(json.loads(volume_stats_raw))
# Format the size to something nicer
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] + 1
volume_stats_raw = json.dumps(volume_stats)
zkhandler.write(
[
(("volume.stats", f"{pool}/{volume}"), volume_stats_raw),
]
)
return True, 'Created RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
name, volume, pool
)
def rename_snapshot(zkhandler, pool, volume, name, new_name):
if not verifyVolume(zkhandler, pool, volume):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
volume, pool
)
if not verifySnapshot(zkhandler, pool, volume, name):
return (
False,
'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format(
name, volume, pool
),
)
# 1. Rename the snapshot
retcode, stdout, stderr = common.run_os_command(
"rbd snap rename {pool}/{volume}@{name} {pool}/{volume}@{new_name}".format(
pool=pool, volume=volume, name=name, new_name=new_name
)
)
if retcode:
return (
False,
'ERROR: Failed to rename RBD snapshot "{}" to "{}" for volume "{}" in pool "{}": {}'.format(
name, new_name, volume, pool, stderr
),
)
# 2. Rename the snapshot in ZK
zkhandler.rename(
[
(
("snapshot", f"{pool}/{volume}/{name}"),
("snapshot", f"{pool}/{volume}/{new_name}"),
),
]
)
return (
True,
'Renamed RBD snapshot "{}" to "{}" for volume "{}" in pool "{}".'.format(
name, new_name, volume, pool
),
)
def remove_snapshot(zkhandler, pool, volume, name):
if not verifyVolume(zkhandler, pool, volume):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
volume, pool
)
if not verifySnapshot(zkhandler, pool, volume, name):
return (
False,
'ERROR: No snapshot with name "{}" is present of volume {} in pool {}.'.format(
name, volume, pool
),
)
# 1. Remove the snapshot
retcode, stdout, stderr = common.run_os_command(
"rbd snap rm {}/{}@{}".format(pool, volume, name)
)
if retcode:
return (
False,
'Failed to remove RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
name, volume, pool, stderr
),
)
# 2. Delete snapshot from Zookeeper
zkhandler.delete([("snapshot", f"{pool}/{volume}/{name}")])
# 3. Update the count of snapshots on this volume
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
volume_stats = dict(json.loads(volume_stats_raw))
# Format the size to something nicer
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] - 1
volume_stats_raw = json.dumps(volume_stats)
zkhandler.write([(("volume.stats", f"{pool}/{volume}"), volume_stats_raw)])
return True, 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
name, volume, pool
)
def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True):
snapshot_list = []
if pool and not verifyPool(zkhandler, pool):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
pool
)
if volume and not verifyPool(zkhandler, volume):
return (
False,
'ERROR: No volume with name "{}" is present in the cluster.'.format(volume),
)
full_snapshot_list = getCephSnapshots(zkhandler, pool, volume)
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for snapshot in full_snapshot_list:
volume, snapshot_name = snapshot.split("@")
pool_name, volume_name = volume.split("/")
if limit:
try:
if re.fullmatch(limit, snapshot_name):
snapshot_list.append(
{
"pool": pool_name,
"volume": volume_name,
"snapshot": snapshot_name,
}
)
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
snapshot_list.append(
{"pool": pool_name, "volume": volume_name, "snapshot": snapshot_name}
)
return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"]))