Compare commits

...

18 Commits

Author SHA1 Message Date
f46bfc962f Bump version to 0.9.75 2023-09-16 23:06:38 -04:00
714d4b6005 Revert float conversion of cpu_cores
Results in much uglier output, there are no decimal core counts.
2023-09-16 23:06:07 -04:00
fa8329ac3d Explicitly round load avg in load plugin 2023-09-16 22:58:49 -04:00
457b7bed3d Handle exceptions in fence migrations 2023-09-16 22:56:09 -04:00
86115b2928 Add startup message for IPMI reachability
It's good to know that this succeeded in addition to knowing if it
failed.
2023-09-16 22:41:58 -04:00
1a906b589e Bump version to 0.9.74 2023-09-16 00:18:13 -04:00
7b230d8bd5 Add monitoring plugin for hardware RAID arrays 2023-09-16 00:02:53 -04:00
48662e90c1 Remove obsolete monitoring_instance passing 2023-09-15 22:47:45 -04:00
079381c03e Move printing to end and add runtime 2023-09-15 22:40:09 -04:00
794cea4a02 Reverse ordering, run checks before starting timer 2023-09-15 22:25:37 -04:00
fa24f3ba75 Fix bad fstring in psur check 2023-09-15 22:19:49 -04:00
caadafa80d Add PSU redundancy sensor check 2023-09-15 19:07:29 -04:00
479e156234 Run monitoring plugins once on startup 2023-09-15 17:53:16 -04:00
86830286f3 Adjust message printing to be on one line 2023-09-15 17:00:34 -04:00
4d51318a40 Make monitoring interval configurable 2023-09-15 16:54:51 -04:00
cba6f5be48 Fix wording of non-coordinator state 2023-09-15 16:51:04 -04:00
254303b9d4 Use coordinator_state instead of router_state
Makes it much clearer what this variable represents.
2023-09-15 16:47:56 -04:00
40b7d68853 Separate monitoring and move to 60s interval
Removes the dependency of the monitoring subsystem from the node
keepalives, and runs them at a 60s interval to avoid excessive backups
if a plugin takes too long.

Adds its own logs and related items as required.

Finally adds a new required argument to the run() of plugins, the
coordinator state, which can be used by a plugin to determine actions
based on whether the node is a primary, secondary, or non-coordinator.
2023-09-15 16:47:11 -04:00
24 changed files with 604 additions and 80 deletions

View File

@@ -1 +1 @@
0.9.73
0.9.75

View File

@@ -1,5 +1,19 @@
## PVC Changelog
###### [v0.9.75](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.75)
* [Node Daemon] Adds a startup message about IPMI when succeeding
* [Node Daemon] Fixes a bug in fencing allowing non-failing VMs to migrate
* [Node Daemon] Adds rounding to load average in load plugin for consistency
###### [v0.9.74](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.74)
* [Docs] Removes docs from the main repo
* [Client CLI] Ensures that "provision" VMs are shown in the right colour
* [Node Daemon] Separates the node monitoring subsystem into its own thread with a longer, customizable update interval
* [Node Daemon] Adds checks for PSU input power reundancy (psur) and hardware RAID (hwrd)
* [Node Daemon] Updates when Keepalive start messages are printed (end of run, with runtime) to align with new monitoring messages
###### [v0.9.73](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.73)
* [Node Daemon] Fixes a bug creating monitoring instance

View File

@@ -27,7 +27,7 @@ from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool
# Daemon version
version = "0.9.73"
version = "0.9.75"
# API version
API_VERSION = 1.0

View File

@@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="pvc",
version="0.9.73",
version="0.9.75",
packages=["pvc.cli", "pvc.lib"],
install_requires=[
"Click",

18
debian/changelog vendored
View File

@@ -1,3 +1,21 @@
pvc (0.9.75-0) unstable; urgency=high
* [Node Daemon] Adds a startup message about IPMI when succeeding
* [Node Daemon] Fixes a bug in fencing allowing non-failing VMs to migrate
* [Node Daemon] Adds rounding to load average in load plugin for consistency
-- Joshua M. Boniface <joshua@boniface.me> Sat, 16 Sep 2023 23:06:38 -0400
pvc (0.9.74-0) unstable; urgency=high
* [Docs] Removes docs from the main repo
* [Client CLI] Ensures that "provision" VMs are shown in the right colour
* [Node Daemon] Separates the node monitoring subsystem into its own thread with a longer, customizable update interval
* [Node Daemon] Adds checks for PSU input power reundancy (psur) and hardware RAID (hwrd)
* [Node Daemon] Updates when Keepalive start messages are printed (end of run, with runtime) to align with new monitoring messages
-- Joshua M. Boniface <joshua@boniface.me> Sat, 16 Sep 2023 00:18:13 -0400
pvc (0.9.73-0) unstable; urgency=high
* [Node Daemon] Fixes a bug creating monitoring instance

View File

@@ -100,9 +100,11 @@ class MonitoringPluginScript(MonitoringPlugin):
self.disk_details = disk_details
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Re-run setup each time to ensure the disk details are current

View File

@@ -62,9 +62,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

247
node-daemon/plugins/hwrd Normal file
View File

@@ -0,0 +1,247 @@
#!/usr/bin/env python3
# hwrd.py - PVC Monitoring example plugin for hardware RAID Arrays
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2023 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
# This script provides an example of a PVC monitoring plugin script. It will create
# a simple plugin to check any hardwrae RAID virtual disks for health and report errors.
# Supports Dell BOSS cards, LSI/Avago/Broadcom MegaRAID, and HP SmartArray RAID.
# This script can thus be used as an example or reference implementation of a
# PVC monitoring pluginscript and expanded upon as required.
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
# of the role of each function is provided in context of the example; see the other
# examples for more potential uses.
# WARNING:
#
# This script will run in the context of the node daemon keepalives as root.
# DO NOT install untrusted, unvetted plugins under any circumstances.
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
# the file name
PLUGIN_NAME = "hwrd"
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
class MonitoringPluginScript(MonitoringPlugin):
def check_dellboss(self):
# Run any imports first
from daemon_lib.common import run_os_command
from re import match
health_delta = 0
messages = list()
_dellboss_ret, _dellboss_list, _ = run_os_command("mvcli info -o vd")
if _dellboss_ret != 0:
health_delta = 50
messages.append("Error running MVCLI command")
else:
arrays = list()
idx = None
for line in _dellboss_list.split('\n'):
if match(r"^id:", line):
idx = int(line.split(":")[-1].strip())
arrays.append(dict())
if match(r"^name:", line):
arrays[idx]["name"] = line.split(":")[-1].strip()
if match(r"^status:", line):
arrays[idx]["status"] = line.split(":")[-1].strip()
for idx, array in enumerate(arrays):
if array["status"] != "functional":
health_delta += 50
messages.append(f"RAID Dell BOSS ID {idx} (Name: {array['name']}, State: {array['status']})")
if len(messages) < 1:
messages.append(f"No valid RAID arrays found")
return health_delta, messages
def check_megaraid(self):
# Run any imports first
from daemon_lib.common import run_os_command
from re import match
health_delta = 0
messages = list()
_megaraid_ret, _megaraid_list, _ = run_os_command("megacli -LDInfo -Lall -aALL")
if _megaraid_ret != 0:
health_delta = 50
messages.append("Error running MegaCLI command")
else:
vd_list = _megaraid_list.split('\n\n\n')
for idx, _vd in enumerate(vd_list):
vd = _vd.split('\n')
if "Virtual Drive Information" not in vd[2]:
continue
raid_name = None
raid_count = 0
raid_state = None
for entry in vd:
if len(entry.split(':')) < 2:
continue
entry_key = entry.split(':')[0].strip()
entry_value = entry.split(':')[1].strip()
if entry_key == "State":
raid_state = entry_value
if entry_key == "Name":
raid_name = entry_value
if entry_key == "Number Of Drives":
raid_count = entry_value
if raid_state is None or raid_name is None or raid_count == 0:
health_delta += 10
messages.append(f"RAID ID {idx} did not report useful values")
continue
if raid_state != "Optimal":
health_delta += 50
messages.append(f"RAID MegaRAID ID {idx} (Name: {raid_name}, Disks: {raid_count}, State: {raid_state})")
if len(messages) < 1:
messages.append(f"No valid RAID arrays found")
return health_delta, messages
def check_hpsa(self):
# Run any imports first
from daemon_lib.common import run_os_command
from re import match, findall
health_delta = 0
messages = list()
_hparray_ret, _hparray_list, _ = run_os_command(f"ssacli ctrl slot={self.controller_slot} ld all show")
if _hparray_ret != 0:
health_delta = 50
messages.append("Error running SSACLI command")
else:
vd_lines = _hparray_list.split('\n\n')
arrays = list()
cur_array = None
for idx, _line in enumerate(vd_lines):
line = _line.strip()
if match(r"^Array", line):
cur_array = line
if match(r"^logicaldrive", line) and cur_array is not None:
arrays.append(f"{cur_array} {line}")
for array in arrays:
if "OK" not in array:
health_delta += 50
messages.append(f"RAID HPSA {array}")
if len(messages) < 1:
messages.append(f"No valid RAID arrays found")
return health_delta, messages
def setup(self):
"""
setup(): Perform special setup steps during node daemon startup
This step is optional and should be used sparingly.
If you wish for the plugin to not load in certain conditions, do any checks here
and return a non-None failure message to indicate the error.
"""
from daemon_lib.common import run_os_command
from re import match, findall
self.raid_type = list()
_dellboss_ret, _dellboss_list, _ = run_os_command("mvcli info -o vd")
if _dellboss_ret == 0:
# If this returns 0 at all, there's a valid BOSS card to manage
self.raid_type.append("dellboss")
_megaraid_ret, _megaraid_list, _ = run_os_command("megacli -LDInfo -Lall -aALL")
if _megaraid_ret == 0:
vd_list = _megaraid_list.split('\n\n\n')
for idx, _vd in enumerate(vd_list):
vd = _vd.split('\n')
if "Virtual Drive Information" in vd[2]:
self.raid_type.append("megaraid")
_hpraid_ret, _hpraid_list, _ = run_os_command("ssacli ctrl all show status")
if _hpraid_ret == 0:
for line in _hpraid_list.split('\n'):
if match(r"^Smart", line):
controller_slots = findall("Slot ([0-9])", line)
if len(controller_slots) > 0:
self.raid_type.append("hpsa")
self.controller_slot = controller_slots[0]
if len(self.raid_type) < 1:
return "No hardware RAID management commands found"
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
"""
health_delta = 0
messages = list()
raid_function_map = {
"megaraid": self.check_megaraid,
"hpsa": self.check_hpsa,
"dellboss": self.check_dellboss,
}
for raid_type in self.raid_type:
_health_delta, _messages = raid_function_map.get(raid_type)()
health_delta += _health_delta
messages += _messages
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(', '.join(messages))
# Return our local PluginResult object
return self.plugin_result
def cleanup(self):
"""
cleanup(): Perform special cleanup steps during node daemon termination
This step is optional and should be used sparingly.
"""
pass

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first
@@ -70,7 +72,7 @@ class MonitoringPluginScript(MonitoringPlugin):
from psutil import cpu_count
# Get the current 1-minute system load average
load_average = getloadavg()[0]
load_average = float(round(getloadavg()[0], 2))
# Get the number of CPU cores
cpu_cores = cpu_count()

View File

@@ -61,9 +61,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -57,9 +57,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

138
node-daemon/plugins/psur Normal file
View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
# psur.py - PVC Monitoring example plugin for PSU Redundancy
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
# This script provides an example of a PVC monitoring plugin script. It will create
# a simple plugin to check IPMI for power supply reundancy status.
# This script can thus be used as an example or reference implementation of a
# PVC monitoring pluginscript and expanded upon as required.
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
# of the role of each function is provided in context of the example; see the other
# examples for more potential uses.
# WARNING:
#
# This script will run in the context of the node daemon keepalives as root.
# DO NOT install untrusted, unvetted plugins under any circumstances.
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
# the file name
PLUGIN_NAME = "psur"
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
class MonitoringPluginScript(MonitoringPlugin):
def setup(self):
"""
setup(): Perform special setup steps during node daemon startup
This step is optional and should be used sparingly.
If you wish for the plugin to not load in certain conditions, do any checks here
and return a non-None failure message to indicate the error.
"""
# Run any imports first
from daemon_lib.common import run_os_command
from re import match
_ipmitool_ret, _ipmitool_list, _ = run_os_command("ipmitool sdr type 'Power Supply'")
if _ipmitool_ret != 0:
return "Error running ipmitool command"
else:
search_values = [
"PS Redundancy", # Dell PowerEdge
"Power Supplies", # HP ProLiant
"PS_RDNDNT_MODE", # Cisco UCS
]
reading_lines = [l for l in _ipmitool_list.split('\n') if len(l.split('|')) > 0 and l.split('|')[0].strip() in search_values]
if len(reading_lines) < 1:
return "No valid input power sensors found"
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first
from daemon_lib.common import run_os_command
from re import match
health_delta = 0
messages = list()
_ipmitool_ret, _ipmitool_list, _ = run_os_command("ipmitool sdr type 'Power Supply'")
if _ipmitool_ret != 0 or len(_ipmitool_list.split('\n')) < 1:
health_delta = 0
messages.append("Error running ipmitool command")
else:
search_values = [
"PS Redundancy", # Dell PowerEdge
"Power Supplies", # HP ProLiant
"PS_RDNDNT_MODE", # Cisco UCS
]
reading_lines = [l for l in _ipmitool_list.split('\n') if len(l.split('|')) > 0 and l.split('|')[0].strip() in search_values]
if len(reading_lines) > 0:
for reading_line in reading_lines:
reading_sensor = reading_line.split('|')[1].strip()
reading_text = reading_line.split('|')[-1].strip()
if reading_text == "Fully Redundant":
health_delta += 0
messages.append(f"Input power sensor {reading_sensor} reports {reading_text}")
elif reading_text == "No Reading":
health_delta += 5
messages.append(f"Input power sensor {reading_sensor} reports {reading_text} (PSU redundancy not configured?)")
else:
health_delta += 10
messages.append(f"Input power sensor {reading_sensor} reports {reading_text}")
else:
health_delta = 5
messages.append("No valid input power sensors found, but configured")
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(', '.join(messages))
# Return our local PluginResult object
return self.plugin_result
def cleanup(self):
"""
cleanup(): Perform special cleanup steps during node daemon termination
This step is optional and should be used sparingly.
"""
pass

View File

@@ -60,9 +60,11 @@ class MonitoringPluginScript(MonitoringPlugin):
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first

View File

@@ -100,6 +100,8 @@ pvc:
vm_shutdown_timeout: 180
# keepalive_interval: Number of seconds between keepalive/status updates
keepalive_interval: 5
# monitoring_interval: Number of seconds between monitoring check updates
monitoring_interval: 60
# fence_intervals: Number of keepalive_intervals to declare a node dead and fence it
fence_intervals: 6
# suicide_intervals: Numer of keepalive_intervals before a node considers itself dead and self-fences, 0 to disable

View File

@@ -49,7 +49,7 @@ import re
import json
# Daemon version
version = "0.9.73"
version = "0.9.75"
##########################################################
@@ -235,12 +235,12 @@ def entrypoint():
# Force into secondary coordinator state if needed
try:
if this_node.router_state == "primary" and len(d_node) > 1:
if this_node.coordinator_state == "primary" and len(d_node) > 1:
zkhandler.write([("base.config.primary_node", "none")])
logger.out("Waiting for primary migration", state="s")
timeout = 240
count = 0
while this_node.router_state != "secondary" and count < timeout:
while this_node.coordinator_state != "secondary" and count < timeout:
sleep(0.5)
count += 1
except Exception:
@@ -255,10 +255,10 @@ def entrypoint():
except Exception:
pass
# Clean up any monitoring plugins that have cleanup
# Shut down the monitoring system
try:
logger.out("Performing monitoring plugin cleanup", state="s")
monitoring_instance.run_cleanups()
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
@@ -324,9 +324,14 @@ def entrypoint():
config["ipmi_hostname"], config["ipmi_username"], config["ipmi_password"]
):
logger.out(
"Our IPMI is not reachable; fencing of this node will likely fail",
"Our IPMI interface is not reachable; fencing of this node will fail until corrected",
state="w",
)
else:
logger.out(
"Our IPMI interface is reachable; fencing of this node is possible",
state="o",
)
# Validate libvirt
if not pvcnoded.util.libvirt.validate_libvirtd(logger, config):
@@ -431,7 +436,7 @@ def entrypoint():
if new_primary == "none":
if (
this_node.daemon_state == "run"
and this_node.router_state
and this_node.coordinator_state
not in ["primary", "takeover", "relinquish"]
):
logger.out(
@@ -477,7 +482,7 @@ def entrypoint():
state="i",
)
elif new_primary == config["node_hostname"]:
if this_node.router_state == "secondary":
if this_node.coordinator_state == "secondary":
# Wait for 0.5s to ensure other contentions time out, then take over
sleep(0.5)
zkhandler.write(
@@ -489,7 +494,7 @@ def entrypoint():
]
)
else:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
# Wait for 0.5s to ensure other contentions time out, then relinquish
sleep(0.5)
zkhandler.write(
@@ -536,7 +541,7 @@ def entrypoint():
)
# Start primary functionality
if (
this_node.router_state == "primary"
this_node.coordinator_state == "primary"
and d_network[network].nettype == "managed"
):
d_network[network].createGateways()
@@ -549,7 +554,7 @@ def entrypoint():
# TODO: Move this to the Network structure
if d_network[network].nettype == "managed":
# Stop primary functionality
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
d_network[network].stopDHCPServer()
d_network[network].removeGateways()
dns_aggregator.remove_network(d_network[network])
@@ -1024,14 +1029,14 @@ def entrypoint():
state="i",
)
# Set up the node monitoring instance
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node, monitoring_instance
logger, config, zkhandler, this_node
)
# Tick loop; does nothing since everything is async

View File

@@ -26,6 +26,7 @@ import importlib.util
from os import walk
from datetime import datetime
from json import dumps
from apscheduler.schedulers.background import BackgroundScheduler
class PluginError(Exception):
@@ -173,9 +174,11 @@ class MonitoringPlugin(object):
"""
pass
def run(self):
def run(self, coordinator_state=None):
"""
run(): Run the plugin, returning a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
return self.plugin_result
@@ -332,10 +335,41 @@ class MonitoringInstance(object):
)
)
self.run_plugins()
self.start_check_timer()
def __del__(self):
self.shutdown()
def shutdown(self):
self.stop_check_timer()
self.run_cleanups()
def start_check_timer(self):
check_interval = self.config["monitoring_interval"]
self.logger.out(
f"Starting monitoring check timer ({check_interval} second interval)",
state="s",
)
self.check_timer = BackgroundScheduler()
self.check_timer.add_job(
self.run_plugins,
trigger="interval",
seconds=check_interval,
)
self.check_timer.start()
def stop_check_timer(self):
try:
self.check_timer.shutdown()
self.logger.out("Stopping monitoring check timer", state="s")
except Exception:
self.logger.out("Failed to stop monitoring check timer", state="w")
def run_plugin(self, plugin):
time_start = datetime.now()
try:
result = plugin.run()
result = plugin.run(coordinator_state=self.this_node.coordinator_state)
except Exception as e:
self.logger.out(
f"Monitoring plugin {plugin.plugin_name} failed: {type(e).__name__}: {e}",
@@ -351,12 +385,17 @@ class MonitoringInstance(object):
return result
def run_plugins(self):
if self.this_node.coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif self.this_node.coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now()
total_health = 100
if self.config["log_keepalive_plugin_details"]:
self.logger.out(
f"Running monitoring plugins: {', '.join([x.plugin_name for x in self.all_plugins])}",
state="t",
)
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
@@ -390,6 +429,38 @@ class MonitoringInstance(object):
]
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
time.sleep(0.2)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = str(self.this_node.health) + "%"
else:
health_colour = self.logger.fmt_blue
health_text = "N/A"
self.logger.out(
"{start_colour}{hostname} healthcheck @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
health_colour=health_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
health=health_text,
runtime=runtime,
),
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()

View File

@@ -52,7 +52,7 @@ class NodeInstance(object):
# States
self.daemon_mode = self.zkhandler.read(("node.mode", self.name))
self.daemon_state = "stop"
self.router_state = "client"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Object lists
self.d_node = d_node
@@ -149,10 +149,10 @@ class NodeInstance(object):
if self.name == self.this_node and self.daemon_mode == "coordinator":
# We're a coordinator so we care about networking
if data != self.router_state:
self.router_state = data
if data != self.coordinator_state:
self.coordinator_state = data
if self.config["enable_networking"]:
if self.router_state == "takeover":
if self.coordinator_state == "takeover":
self.logger.out(
"Setting node {} to primary state".format(self.name),
state="i",
@@ -161,7 +161,7 @@ class NodeInstance(object):
target=self.become_primary, args=(), kwargs={}
)
transition_thread.start()
if self.router_state == "relinquish":
if self.coordinator_state == "relinquish":
# Skip becoming secondary unless already running
if (
self.daemon_state == "run"
@@ -539,7 +539,7 @@ class NodeInstance(object):
tick = 1
patroni_failed = True
# As long as we're in takeover, keep trying to set the Patroni leader to us
while self.router_state == "takeover":
while self.coordinator_state == "takeover":
# Switch Patroni leader to the local instance
retcode, stdout, stderr = common.run_os_command(
"""

View File

@@ -306,11 +306,11 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if data and self.ip6_gateway != data.decode("ascii"):
orig_gateway = self.ip6_gateway
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
if orig_gateway:
self.removeGateway6Address()
self.ip6_gateway = data.decode("ascii")
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.createGateway6Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
@@ -333,13 +333,13 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if (
self.dhcp6_flag
and not self.dhcp_server_daemon
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.startDHCPServer()
elif (
self.dhcp_server_daemon
and not self.dhcp4_flag
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.stopDHCPServer()
@@ -371,11 +371,11 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if data and self.ip4_gateway != data.decode("ascii"):
orig_gateway = self.ip4_gateway
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
if orig_gateway:
self.removeGateway4Address()
self.ip4_gateway = data.decode("ascii")
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.createGateway4Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
@@ -398,13 +398,13 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if (
self.dhcp4_flag
and not self.dhcp_server_daemon
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.startDHCPServer()
elif (
self.dhcp_server_daemon
and not self.dhcp6_flag
and self.this_node.router_state in ["primary", "takeover"]
and self.this_node.coordinator_state in ["primary", "takeover"]
):
self.stopDHCPServer()
@@ -450,7 +450,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
if self.dhcp_reservations != new_reservations:
old_reservations = self.dhcp_reservations
self.dhcp_reservations = new_reservations
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.updateDHCPReservations(old_reservations, new_reservations)
if self.dhcp_server_daemon:
self.stopDHCPServer()
@@ -706,7 +706,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
self.createGateway4Address()
def createGateway6Address(self):
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.logger.out(
"Creating gateway {}/{} on interface {}".format(
self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic
@@ -719,7 +719,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
)
def createGateway4Address(self):
if self.this_node.router_state in ["primary", "takeover"]:
if self.this_node.coordinator_state in ["primary", "takeover"]:
self.logger.out(
"Creating gateway {}/{} on interface {}".format(
self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic
@@ -733,7 +733,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
def startDHCPServer(self):
if (
self.this_node.router_state in ["primary", "takeover"]
self.this_node.coordinator_state in ["primary", "takeover"]
and self.nettype == "managed"
):
self.logger.out(

View File

@@ -246,6 +246,7 @@ def get_configuration():
config_intervals = {
"vm_shutdown_timeout": int(o_intervals.get("vm_shutdown_timeout", 60)),
"keepalive_interval": int(o_intervals.get("keepalive_interval", 5)),
"monitoring_interval": int(o_intervals.get("monitoring_interval", 60)),
"fence_intervals": int(o_intervals.get("fence_intervals", 6)),
"suicide_intervals": int(o_intervals.get("suicide_interval", 0)),
}

View File

@@ -153,7 +153,13 @@ def migrateFromFencedNode(zkhandler, node_name, config, logger):
# Loop through the VMs
for dom_uuid in dead_node_running_domains:
fence_migrate_vm(dom_uuid)
try:
fence_migrate_vm(dom_uuid)
except Exception as e:
logger.out(
f"Failed to migrate VM {dom_uuid}, continuing: {e}",
state="w",
)
# Set node in flushed state for easy remigrating when it comes back
zkhandler.write([(("node.state.domain", node_name), "flushed")])

View File

@@ -51,7 +51,7 @@ libvirt_vm_states = {
}
def start_keepalive_timer(logger, config, zkhandler, this_node, monitoring_instance):
def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_interval = config["keepalive_interval"]
logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
@@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node, monitoring_insta
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node, monitoring_instance),
args=(logger, config, zkhandler, this_node),
trigger="interval",
seconds=keepalive_interval,
)
@@ -98,7 +98,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
return
# Primary-only functions
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty)
if debug:
logger.out(
@@ -417,7 +417,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Upload OSD data for the cluster (primary-only)
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
@@ -674,33 +674,25 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
def node_keepalive(logger, config, zkhandler, this_node):
debug = config["debug"]
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
cst_colour = logger.fmt_green
elif this_node.router_state == "secondary":
elif this_node.coordinator_state == "secondary":
cst_colour = logger.fmt_blue
else:
cst_colour = logger.fmt_cyan
logger.out(
"{}{} keepalive @ {}{} [{}{}{}]".format(
logger.fmt_purple,
config["node_hostname"],
datetime.now(),
logger.fmt_end,
logger.fmt_bold + cst_colour,
this_node.router_state,
logger.fmt_end,
),
state="t",
)
active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now()
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.migration_target_selector")
@@ -723,7 +715,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
# Set the upstream IP in Zookeeper for clients to read
if config["enable_networking"]:
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.upstream_ip")
@@ -757,7 +749,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.router_state == "primary":
if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
@@ -859,12 +851,24 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
except Exception:
logger.out("Failed to set keepalive data", state="e")
# Run this here since monitoring plugins output directly
monitoring_instance.run_plugins()
# Allow the health value to update in the Node instance
time.sleep(0.1)
if config["log_keepalives"]:
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
logger.out(
"{start_colour}{hostname} keepalive @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] in {runtime} seconds".format(
start_colour=logger.fmt_purple,
cst_colour=logger.fmt_bold + cst_colour,
nofmt=logger.fmt_end,
hostname=config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
runtime=runtime,
),
state="t",
)
if this_node.maintenance is True:
maintenance_colour = logger.fmt_blue
else: