Compare commits

...

7 Commits

Author SHA1 Message Date
caadafa80d Add PSU redundancy sensor check 2023-09-15 19:07:29 -04:00
479e156234 Run monitoring plugins once on startup 2023-09-15 17:53:16 -04:00
86830286f3 Adjust message printing to be on one line 2023-09-15 17:00:34 -04:00
4d51318a40 Make monitoring interval configurable 2023-09-15 16:54:51 -04:00
cba6f5be48 Fix wording of non-coordinator state 2023-09-15 16:51:04 -04:00
254303b9d4 Use coordinator_state instead of router_state
Makes it much clearer what this variable represents.
2023-09-15 16:47:56 -04:00
40b7d68853 Separate monitoring and move to 60s interval
Removes the dependency of the monitoring subsystem from the node
keepalives, and runs them at a 60s interval to avoid excessive backups
if a plugin takes too long.

Adds its own logs and related items as required.

Finally adds a new required argument to the run() of plugins, the
coordinator state, which can be used by a plugin to determine actions
based on whether the node is a primary, secondary, or non-coordinator.
2023-09-15 16:47:11 -04:00
17 changed files with 282 additions and 57 deletions

View File

@@ -100,9 +100,11 @@ class MonitoringPluginScript(MonitoringPlugin):
self.disk_details = disk_details
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Re-run setup each time to ensure the disk details are current

View File

@@ -62,9 +62,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -61,9 +61,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -57,9 +57,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

138
node-daemon/plugins/psur Normal file
View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
# psur.py - PVC Monitoring example plugin for PSU Redundancy
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
# This script provides an example of a PVC monitoring plugin script. It will create
# a simple plugin to check IPMI for power supply reundancy status.
# This script can thus be used as an example or reference implementation of a
# PVC monitoring pluginscript and expanded upon as required.
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
# of the role of each function is provided in context of the example; see the other
# examples for more potential uses.
# WARNING:
#
# This script will run in the context of the node daemon keepalives as root.
# DO NOT install untrusted, unvetted plugins under any circumstances.
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
# the file name
PLUGIN_NAME = "psur"
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
class MonitoringPluginScript(MonitoringPlugin):
def setup(self):
"""
setup(): Perform special setup steps during node daemon startup
This step is optional and should be used sparingly.
If you wish for the plugin to not load in certain conditions, do any checks here
and return a non-None failure message to indicate the error.
"""
# Run any imports first
from daemon_lib.common import run_os_command
from re import match
_ipmitool_ret, _ipmitool_list, _ = run_os_command("ipmitool sdr type 'Power Supply'")
if _ipmitool_ret != 0:
return "Error running ipmitool command"
else:
search_values = [
"PS Redundancy", # Dell PowerEdge
"Power Supplies", # HP ProLiant
"PS_RDNDNT_MODE", # Cisco UCS
]
reading_lines = [l for l in _ipmitool_list.split('\n') if len(l.split('|')) > 0 and l.split('|')[0].strip() in search_values]
if len(reading_lines) < 1:
return "No valid input power sensors found"
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first
from daemon_lib.common import run_os_command
from re import match
health_delta = 0
messages = list()
_ipmitool_ret, _ipmitool_list, _ = run_os_command("ipmitool sdr type 'Power Supply'")
if _ipmitool_ret != 0 or len(_ipmitool_list.split('\n')) < 1:
health_delta = 0
messages.append("Error running ipmitool command")
else:
search_values = [
"PS Redundancy", # Dell PowerEdge
"Power Supplies", # HP ProLiant
"PS_RDNDNT_MODE", # Cisco UCS
]
reading_lines = [l for l in _ipmitool_list.split('\n') if len(l.split('|')) > 0 and l.split('|')[0].strip() in search_values]
if len(reading_lines) > 0:
for reading_line in reading_lines:
reading_sensor = reading_line.split('|')[1].strip()
reading_text = reading_line.split('|')[-1].strip()
if reading_text == "Fully Redundant":
health_delta += 0
messages.append(f"Input power sensor {reading_sensor} reports {reading_text}")
elif reading_text == "No Reading":
health_delta += 5
messages.append("Input power sensor {reading_sensor} reports {reading_text}, redundant power not configured")
else:
health_delta += 10
messages.append(f"Input power sensor {reading_sensor} reports {reading_text}")
else:
health_delta = 5
messages.append("No valid input power sensors found, but configured")
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(', '.join(messages))
# Return our local PluginResult object
return self.plugin_result
def cleanup(self):
"""
cleanup(): Perform special cleanup steps during node daemon termination
This step is optional and should be used sparingly.
"""
pass

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -100,6 +100,8 @@ pvc:
vm_shutdown_timeout: 180
# keepalive_interval: Number of seconds between keepalive/status updates
keepalive_interval: 5
# monitoring_interval: Number of seconds between monitoring check updates
monitoring_interval: 60
# fence_intervals: Number of keepalive_intervals to declare a node dead and fence it
fence_intervals: 6
# suicide_intervals: Numer of keepalive_intervals before a node considers itself dead and self-fences, 0 to disable

View File

@@ -235,12 +235,12 @@ def entrypoint():
# Force into secondary coordinator state if needed
try:
if this_node.router_state == "primary" and len(d_node) > 1:
if this_node.coordinator_state == "primary" and len(d_node) > 1:
zkhandler.write([("base.config.primary_node", "none")])
logger.out("Waiting for primary migration", state="s")
timeout = 240
count = 0
while this_node.router_state != "secondary" and count < timeout:
while this_node.coordinator_state != "secondary" and count < timeout:
sleep(0.5)
count += 1
except Exception:
@@ -255,10 +255,10 @@ def entrypoint():
except Exception:
pass
# Clean up any monitoring plugins that have cleanup
# Shut down the monitoring system
try:
logger.out("Performing monitoring plugin cleanup", state="s")
monitoring_instance.run_cleanups()
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
@@ -431,7 +431,7 @@ def entrypoint():
if new_primary == "none":
if (
this_node.daemon_state == "run"
and this_node.router_state
and this_node.coordinator_state
not in ["primary", "takeover", "relinquish"]
):
logger.out(
@@ -477,7 +477,7 @@ def entrypoint():
state="i",
)
elif new_primary == config["node_hostname"]:
if this_node.router_state == "secondary":
if this_node.coordinator_state == "secondary":
# Wait for 0.5s to ensure other contentions time out, then take over
sleep(0.5)
zkhandler.write(
@@ -489,7 +489,7 @@ def entrypoint():
]
)
else:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
# Wait for 0.5s to ensure other contentions time out, then relinquish
sleep(0.5)
zkhandler.write(
@@ -536,7 +536,7 @@ def entrypoint():
)
# Start primary functionality
if (
this_node.router_state == "primary"
this_node.coordinator_state == "primary"
and d_network[network].nettype == "managed"
):
d_network[network].createGateways()
@@ -549,7 +549,7 @@ def entrypoint():
# TODO: Move this to the Network structure
if d_network[network].nettype == "managed":
# Stop primary functionality
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
d_network[network].stopDHCPServer()
d_network[network].removeGateways()
dns_aggregator.remove_network(d_network[network])

View File

@@ -26,6 +26,7 @@ import importlib.util
from os import walk
from datetime import datetime
from json import dumps
from apscheduler.schedulers.background import BackgroundScheduler
class PluginError(Exception):
@@ -173,9 +174,11 @@ class MonitoringPlugin(object):
"""
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Run the plugin, returning a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
return self.plugin_result
@@ -332,10 +335,41 @@ class MonitoringInstance(object):
)
)
self.start_check_timer()
self.run_plugins()
def __del__(self):
self.shutdown()
def shutdown(self):
self.stop_check_timer()
self.run_cleanups()
def start_check_timer(self):
check_interval = self.config["monitoring_interval"]
self.logger.out(
f"Starting monitoring check timer ({check_interval} second interval)",
state="s",
)
self.check_timer = BackgroundScheduler()
self.check_timer.add_job(
self.run_plugins,
trigger="interval",
seconds=check_interval,
)
self.check_timer.start()
def stop_check_timer(self):
try:
self.check_timer.shutdown()
self.logger.out("Stopping monitoring check timer", state="s")
except Exception:
self.logger.out("Failed to stop monitoring check timer", state="w")
def run_plugin(self, plugin):
time_start = datetime.now()
try:
result = plugin.run()
result = plugin.run(coordinator_state=self.this_node.coordinator_state)
except Exception as e:
self.logger.out(
f"Monitoring plugin {plugin.plugin_name} failed: {type(e).__name__}: {e}",
@@ -351,12 +385,17 @@ class MonitoringInstance(object):
return result
def run_plugins(self):
if self.this_node.coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif self.this_node.coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now()
total_health = 100
if self.config["log_keepalive_plugin_details"]:
self.logger.out(
f"Running monitoring plugins: {', '.join([x.plugin_name for x in self.all_plugins])}",
state="t",
)
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
@@ -390,6 +429,38 @@ class MonitoringInstance(object):
]
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
time.sleep(0.2)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = str(self.this_node.health) + "%"
else:
health_colour = self.logger.fmt_blue
health_text = "N/A"
self.logger.out(
"{start_colour}{hostname} healthcheck @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
health_colour=health_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
health=health_text,
runtime=runtime,
),
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()

View File

@@ -52,7 +52,7 @@ class NodeInstance(object):
# States
self.daemon_mode = self.zkhandler.read(("node.mode", self.name))
self.daemon_state = "stop"
self.router_state = "client"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Object lists
self.d_node = d_node
@@ -149,10 +149,10 @@ class NodeInstance(object):
if self.name == self.this_node and self.daemon_mode == "coordinator":
# We're a coordinator so we care about networking
if data != self.router_state:
self.router_state = data
if data != self.coordinator_state:
self.coordinator_state = data
if self.config["enable_networking"]:
if self.router_state == "takeover":
if self.coordinator_state == "takeover":
self.logger.out(
"Setting node {} to primary state".format(self.name),
state="i",
@@ -161,7 +161,7 @@ class NodeInstance(object):
target=self.become_primary, args=(), kwargs={}
)
transition_thread.start()
if self.router_state == "relinquish":
if self.coordinator_state == "relinquish":
# Skip becoming secondary unless already running
if (
self.daemon_state == "run"
@@ -539,7 +539,7 @@ class NodeInstance(object):
tick = 1
patroni_failed = True
# As long as we're in takeover, keep trying to set the Patroni leader to us
while self.router_state == "takeover":
while self.coordinator_state == "takeover":
# Switch Patroni leader to the local instance
retcode, stdout, stderr = common.run_os_command(
"""

View File

@@ -306,11 +306,11 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if data and self.ip6_gateway != data.decode("ascii"):
orig_gateway = self.ip6_gateway
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
if orig_gateway:
self.removeGateway6Address()
self.ip6_gateway = data.decode("ascii")
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.createGateway6Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
@@ -333,13 +333,13 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if (
self.dhcp6_flag
and not self.dhcp_server_daemon
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.startDHCPServer()
elif (
self.dhcp_server_daemon
and not self.dhcp4_flag
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.stopDHCPServer()
@@ -371,11 +371,11 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if data and self.ip4_gateway != data.decode("ascii"):
orig_gateway = self.ip4_gateway
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
if orig_gateway:
self.removeGateway4Address()
self.ip4_gateway = data.decode("ascii")
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.createGateway4Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
@@ -398,13 +398,13 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if (
self.dhcp4_flag
and not self.dhcp_server_daemon
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.startDHCPServer()
elif (
self.dhcp_server_daemon
and not self.dhcp6_flag
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.stopDHCPServer()
@@ -450,7 +450,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if self.dhcp_reservations != new_reservations:
old_reservations = self.dhcp_reservations
self.dhcp_reservations = new_reservations
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.updateDHCPReservations(old_reservations, new_reservations)
if self.dhcp_server_daemon:
self.stopDHCPServer()
@@ -706,7 +706,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
self.createGateway4Address()
def createGateway6Address(self):
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.logger.out(
"Creating gateway {}/{} on interface {}".format(
self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic
@@ -719,7 +719,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
)
def createGateway4Address(self):
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.logger.out(
"Creating gateway {}/{} on interface {}".format(
self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic
@@ -733,7 +733,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
def startDHCPServer(self):
if (
self.this_node.router_state in ["primary", "takeover"]
self.this_node.coordinator_state in ["primary", "takeover"]
and self.nettype == "managed"
):
self.logger.out(

View File

@@ -246,6 +246,7 @@ def get_configuration():
config_intervals = {
"vm_shutdown_timeout": int(o_intervals.get("vm_shutdown_timeout", 60)),
"keepalive_interval": int(o_intervals.get("keepalive_interval", 5)),
"monitoring_interval": int(o_intervals.get("monitoring_interval", 60)),
"fence_intervals": int(o_intervals.get("fence_intervals", 6)),
"suicide_intervals": int(o_intervals.get("suicide_interval", 0)),
}

View File

@@ -98,7 +98,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
return
# Primary-only functions
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty)
if debug:
logger.out(
@@ -417,7 +417,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Upload OSD data for the cluster (primary-only)
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
@@ -679,9 +679,9 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
cst_colour = logger.fmt_green
elif this_node.router_state == "secondary":
elif this_node.coordinator_state == "secondary":
cst_colour = logger.fmt_blue
else:
cst_colour = logger.fmt_cyan
@@ -692,7 +692,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
datetime.now(),
logger.fmt_end,
logger.fmt_bold + cst_colour,
this_node.router_state,
this_node.coordinator_state,
logger.fmt_end,
),
state="t",
@@ -700,7 +700,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.migration_target_selector")
@@ -723,7 +723,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
# Set the upstream IP in Zookeeper for clients to read
if config["enable_networking"]:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.upstream_ip")
@@ -757,7 +757,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
@@ -859,11 +859,6 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
except Exception:
logger.out("Failed to set keepalive data", state="e")
# Run this here since monitoring plugins output directly
monitoring_instance.run_plugins()
# Allow the health value to update in the Node instance
time.sleep(0.1)
if config["log_keepalives"]:
if this_node.maintenance is True:
maintenance_colour = logger.fmt_blue