@ -25,7 +25,8 @@ import importlib.util
|
||||
|
||||
from os import walk
|
||||
from datetime import datetime
|
||||
from json import dumps
|
||||
from hashlib import sha1
|
||||
from json import dumps, loads
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
|
||||
|
||||
@ -197,6 +198,97 @@ class MonitoringInstance(object):
|
||||
self.logger = logger
|
||||
self.this_node = this_node
|
||||
|
||||
# Create functions for each fault type
|
||||
def get_node_daemon_states():
|
||||
return [
|
||||
(node, self.zkhandler.read(("node.state.daemon", node)))
|
||||
for node in self.zkhandler.children("base.node")
|
||||
]
|
||||
|
||||
def get_osd_out_states():
|
||||
return [
|
||||
(osd, loads(self.zkhandler.read(("osd.stats", osd))).get("out", 0))
|
||||
for osd in self.zkhandler.children("base.osd")
|
||||
]
|
||||
|
||||
def get_ceph_health_entries():
|
||||
return [
|
||||
(value, key)
|
||||
for key, value in loads(zkhandler.read("base.storage.health"))[
|
||||
"checks"
|
||||
].items()
|
||||
]
|
||||
|
||||
def get_vm_states():
|
||||
return [
|
||||
(
|
||||
self.zkhandler.read(("domain.name", domain)),
|
||||
self.zkhandler.read(("domain.state", domain)),
|
||||
)
|
||||
for domain in self.zkhandler.children("base.domain")
|
||||
]
|
||||
|
||||
def get_overprovisioned_memory():
|
||||
all_nodes = self.zkhandler.children("base.node")
|
||||
current_memory_provisioned = sum(
|
||||
[
|
||||
int(self.zkhandler.read(("node.memory.allocated", node)))
|
||||
for node in all_nodes
|
||||
]
|
||||
)
|
||||
node_memory_totals = [
|
||||
int(self.zkhandler.read(("node.memory.total", node)))
|
||||
for node in all_nodes
|
||||
]
|
||||
total_node_memory = sum(node_memory_totals)
|
||||
most_node_memory = sorted(node_memory_totals)[-1]
|
||||
available_node_memory = total_node_memory - most_node_memory
|
||||
|
||||
if current_memory_provisioned >= available_node_memory:
|
||||
op_str = "overprovisioned"
|
||||
else:
|
||||
op_str = "ok"
|
||||
return [
|
||||
(
|
||||
f"{current_memory_provisioned}MB > {available_node_memory}MB (N-1)",
|
||||
op_str,
|
||||
)
|
||||
]
|
||||
|
||||
# This is a list of all possible faults (cluster error messages) and their corresponding details
|
||||
self.faults_map = {
|
||||
"dead_or_fenced_node": {
|
||||
"entries": get_node_daemon_states,
|
||||
"conditions": ["dead", "fenced"],
|
||||
"delta": 50,
|
||||
"message": "Node {entry} was dead and/or fenced.",
|
||||
},
|
||||
"ceph_osd_out": {
|
||||
"entries": get_osd_out_states,
|
||||
"conditions": ["1"],
|
||||
"delta": 25,
|
||||
"message": "OSD {entry} was out.",
|
||||
},
|
||||
"ceph_err": {
|
||||
"entries": get_ceph_health_entries,
|
||||
"conditions": ["HEALTH_ERR"],
|
||||
"delta": 50,
|
||||
"message": "Ceph cluster reported ERR: {entry}",
|
||||
},
|
||||
"vm_failed": {
|
||||
"entries": get_vm_states,
|
||||
"conditions": ["fail"],
|
||||
"delta": 10,
|
||||
"message": "VM {entry} was failed.",
|
||||
},
|
||||
"memory_overprovisioned": {
|
||||
"entries": get_overprovisioned_memory,
|
||||
"conditions": ["overprovisioned"],
|
||||
"delta": 25,
|
||||
"message": "Cluster memory was overprovisioned {entry}",
|
||||
},
|
||||
}
|
||||
|
||||
# Get a list of plugins from the plugin_directory
|
||||
plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[
|
||||
2
|
||||
@ -344,38 +436,133 @@ class MonitoringInstance(object):
|
||||
)
|
||||
)
|
||||
|
||||
self.run_plugins()
|
||||
self.start_check_timer()
|
||||
self.start_timer()
|
||||
|
||||
def __del__(self):
|
||||
self.shutdown()
|
||||
|
||||
def shutdown(self):
|
||||
self.stop_check_timer()
|
||||
self.stop_timer()
|
||||
self.run_cleanups()
|
||||
return
|
||||
|
||||
def start_check_timer(self):
|
||||
check_interval = self.config["monitoring_interval"]
|
||||
def start_timer(self):
|
||||
check_interval = int(self.config["monitoring_interval"])
|
||||
|
||||
self.timer = BackgroundScheduler()
|
||||
self.timer.add_job(
|
||||
self.run_checks,
|
||||
trigger="interval",
|
||||
seconds=check_interval,
|
||||
)
|
||||
|
||||
self.logger.out(
|
||||
f"Starting monitoring check timer ({check_interval} second interval)",
|
||||
state="s",
|
||||
)
|
||||
self.check_timer = BackgroundScheduler()
|
||||
self.check_timer.add_job(
|
||||
self.run_plugins,
|
||||
trigger="interval",
|
||||
seconds=check_interval,
|
||||
)
|
||||
self.check_timer.start()
|
||||
self.timer.start()
|
||||
|
||||
def stop_check_timer(self):
|
||||
self.run_faults()
|
||||
self.run_plugins()
|
||||
|
||||
def stop_timer(self):
|
||||
try:
|
||||
self.check_timer.shutdown()
|
||||
self.logger.out("Stopping monitoring check timer", state="s")
|
||||
self.timer.shutdown()
|
||||
except Exception:
|
||||
self.logger.out("Failed to stop monitoring check timer", state="w")
|
||||
|
||||
def generate_fault(self, fault_time, fault_delta, fault_message):
|
||||
# Generate a fault ID from the fault_message and fault_delta
|
||||
fault_str = f"{fault_delta} {fault_message}"
|
||||
fault_id = int(sha1(fault_str.encode("utf-8")).hexdigest(), 16) % (10**8)
|
||||
|
||||
self.logger.out(
|
||||
f"Generating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
|
||||
)
|
||||
|
||||
# If a fault already exists with this ID, just update the time
|
||||
if not self.zkhandler.exists("base.faults"):
|
||||
self.logger.out(
|
||||
"Skipping fault reporting due to missing Zookeeper schemas", state="w"
|
||||
)
|
||||
return
|
||||
|
||||
if fault_id in self.zkhandler.children("base.faults", retval=[]):
|
||||
self.zkhandler.write(
|
||||
[
|
||||
(("faults.last_time", fault_id), str(fault_time)),
|
||||
]
|
||||
)
|
||||
# Otherwise, generate a new fault event
|
||||
else:
|
||||
self.zkhandler.write(
|
||||
[
|
||||
(("faults.id", fault_id), ""),
|
||||
(("faults.first_time", fault_id), str(fault_time)),
|
||||
(("faults.last_time", fault_id), str(fault_time)),
|
||||
(("faults.ack_time", fault_id), ""),
|
||||
(("faults.status", fault_id), "new"),
|
||||
(("faults.delta", fault_id), fault_delta),
|
||||
(("faults.message", fault_id), fault_message),
|
||||
]
|
||||
)
|
||||
|
||||
def run_faults(self):
|
||||
if self.this_node.coordinator_state == "primary":
|
||||
cst_colour = self.logger.fmt_green
|
||||
elif self.this_node.coordinator_state == "secondary":
|
||||
cst_colour = self.logger.fmt_blue
|
||||
else:
|
||||
cst_colour = self.logger.fmt_cyan
|
||||
|
||||
active_coordinator_state = self.this_node.coordinator_state
|
||||
|
||||
runtime_start = datetime.now()
|
||||
self.logger.out(
|
||||
"Starting monitoring fault check run",
|
||||
state="t",
|
||||
)
|
||||
|
||||
fault_count = 0
|
||||
for fault_type in self.faults_map.keys():
|
||||
fault_details = self.faults_map[fault_type]
|
||||
|
||||
entries = fault_details["entries"]()
|
||||
for _entry in entries:
|
||||
entry = _entry[0]
|
||||
detail = _entry[1]
|
||||
for condition in fault_details["conditions"]:
|
||||
if str(condition) in str(detail):
|
||||
fault_time = datetime.now()
|
||||
fault_delta = fault_details["delta"]
|
||||
fault_message = fault_details["message"].format(entry=entry)
|
||||
fault_count += 1
|
||||
self.generate_fault(fault_time, fault_delta, fault_message)
|
||||
|
||||
runtime_end = datetime.now()
|
||||
runtime_delta = runtime_end - runtime_start
|
||||
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
|
||||
if fault_count > 0:
|
||||
fault_colour = self.logger.fmt_red
|
||||
else:
|
||||
fault_colour = self.logger.fmt_green
|
||||
|
||||
self.logger.out(
|
||||
"{start_colour}{hostname} fault check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {fault_colour}{fault_count} faults{nofmt} in {runtime} seconds".format(
|
||||
start_colour=self.logger.fmt_purple,
|
||||
cst_colour=self.logger.fmt_bold + cst_colour,
|
||||
fault_colour=fault_colour,
|
||||
nofmt=self.logger.fmt_end,
|
||||
hostname=self.config["node_hostname"],
|
||||
starttime=runtime_start,
|
||||
costate=active_coordinator_state,
|
||||
fault_count=fault_count,
|
||||
runtime=runtime,
|
||||
),
|
||||
state="t",
|
||||
)
|
||||
|
||||
def run_plugin(self, plugin):
|
||||
time_start = datetime.now()
|
||||
try:
|
||||
@ -406,7 +593,7 @@ class MonitoringInstance(object):
|
||||
|
||||
runtime_start = datetime.now()
|
||||
self.logger.out(
|
||||
"Starting monitoring healthcheck run",
|
||||
"Starting monitoring plugin check run",
|
||||
state="t",
|
||||
)
|
||||
|
||||
@ -459,7 +646,7 @@ class MonitoringInstance(object):
|
||||
health_text = "N/A"
|
||||
|
||||
self.logger.out(
|
||||
"{start_colour}{hostname} healthcheck @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
|
||||
"{start_colour}{hostname} plugin check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
|
||||
start_colour=self.logger.fmt_purple,
|
||||
cst_colour=self.logger.fmt_bold + cst_colour,
|
||||
health_colour=health_colour,
|
||||
@ -494,3 +681,7 @@ class MonitoringInstance(object):
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
def run_checks(self):
|
||||
self.run_faults()
|
||||
self.run_plugins()
|
||||
|
Reference in New Issue
Block a user