Initial implementation of monitoring plugin system

This commit is contained in:
2023-02-13 03:06:06 -05:00
parent aeb238f43c
commit 3c742a827b
10 changed files with 664 additions and 9 deletions

158
node-daemon/plugins/dpkg Normal file
View File

@ -0,0 +1,158 @@
#!/usr/bin/env python3
# dpkg.py - PVC Monitoring example plugin for dpkg status
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
# This script provides an example of a PVC monitoring plugin script. It will create
# a simple plugin to check the system dpkg status is as expected, with no invalid
# packages or obsolete configuration files, and will return a 1 health delta for each
# flaw in invalid packages, upgradable packages, and obsolete config files.
# This script can thus be used as an example or reference implementation of a
# PVC monitoring pluginscript and expanded upon as required.
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
# of the role of each function is provided in context of the example; see the other
# examples for more potential uses.
# WARNING:
#
# This script will run in the context of the node daemon keepalives as root.
# DO NOT install untrusted, unvetted plugins under any circumstances.
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
# the file name
PLUGIN_NAME = "dpkg"
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
class MonitoringPluginScript(MonitoringPlugin):
def setup(self):
"""
setup(): Perform special setup steps during node daemon startup
This step is optional and should be used sparingly.
"""
pass
def run(self):
"""
run(): Perform the check actions and return a PluginResult object
"""
# Run any imports first
from re import match
from json import dumps
import daemon_lib.common as pvc_common
# Get Debian version
with open('/etc/debian_version', 'r') as fh:
debian_version = fh.read().strip()
# Get a list of dpkg packages for analysis
retcode, stdout, stderr = pvc_common.run_os_command("/usr/bin/dpkg --list")
# Get a list of installed packages and states
packages = list()
for dpkg_line in stdout.split('\n'):
if match('^[a-z][a-z] ', dpkg_line):
line_split = dpkg_line.split()
package_state = line_split[0]
package_name = line_split[1]
packages.append((package_name, package_state))
count_ok = 0
count_inconsistent = 0
list_inconsistent = list()
for package in packages:
if package[1] == "ii":
count_ok += 1
else:
count_inconsistent += 1
list_inconsistent.append(package[0])
# Get upgradable packages
retcode, stdout, stderr = pvc_common.run_os_command("/usr/bin/apt list --upgradable")
list_upgradable = list()
for apt_line in stdout.split('\n'):
if match('^[a-z][a-z] ', apt_line):
line_split = apt_line.split('/')
package_name = line_split[0]
list_upgradable.append(package_name)
count_upgradable = len(list_upgradable)
# Get obsolete config files (dpkg-* or ucf-* under /etc)
retcode, stdout, stderr = pvc_common.run_os_command("/usr/bin/find /etc -type f -a \( -name '*.dpkg-*' -o -name '*.ucf-*' \)")
obsolete_conffiles = list()
for conffile_line in stdout.split('\n'):
if conffile_line:
obsolete_conffiles.append(conffile_line)
count_obsolete_conffiles = len(obsolete_conffiles)
# Set health_delta based on the results
health_delta = 0
if count_inconsistent > 0:
health_delta += 1
if count_upgradable > 0:
health_delta += 1
if count_obsolete_conffiles > 0:
health_delta += 1
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Craft the message
message = f"Debian {debian_version}; Obsolete conffiles: {count_obsolete_conffiles}; Packages valid: {count_ok}, inconsistent: {count_inconsistent}, upgradable: {count_upgradable}"
# Set the message in our local PluginResult object
self.plugin_result.set_message(message)
# Set the detailed data in our local PluginResult object
detailed_data = {
"debian_version": debian_version,
"obsolete_conffiles": obsolete_conffiles,
"inconsistent_packages": list_inconsistent,
"upgradable_packages": list_upgradable,
}
self.plugin_result.set_data(dumps(detailed_data))
# Return our local PluginResult object
return self.plugin_result
def cleanup(self):
"""
cleanup(): Perform special cleanup steps during node daemon termination
This step is optional and should be used sparingly.
"""
pass

105
node-daemon/plugins/load Normal file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
# load.py - PVC Monitoring example plugin for load
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
# This script provides an example of a PVC monitoring plugin script. It will create
# a simple plugin to check the system load against the total number of CPU cores,
# and return a 10 health delta (100 -> 90) if the load average is > 1/2 that number.
# This script can thus be used as an example or reference implementation of a
# PVC monitoring pluginscript and expanded upon as required.
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
# of the role of each function is provided in context of the example; see the other
# examples for more potential uses.
# WARNING:
#
# This script will run in the context of the node daemon keepalives as root.
# DO NOT install untrusted, unvetted plugins under any circumstances.
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
# the file name
PLUGIN_NAME = "load"
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
class MonitoringPluginScript(MonitoringPlugin):
def setup(self):
"""
setup(): Perform special setup steps during node daemon startup
This step is optional and should be used sparingly.
"""
pass
def run(self):
"""
run(): Perform the check actions and return a PluginResult object
"""
# Run any imports first
from os import getloadavg
from psutil import cpu_count
# Get the current 1-minute system load average
load_average = getloadavg()[0]
# Get the number of CPU cores
cpu_cores = cpu_count()
# Check that the load average is greater or equal to the cpu count
if load_average > float(cpu_cores):
# Set the health delta to 10 (subtract 10 from the total of 100)
health_delta = 10
# Craft a message that can be used by the clients
message = f"Current load is {load_average} out of {cpu_cores} CPU cores"
else:
# Set the health delta to 0 (no change)
health_delta = 0
# Craft a message that can be used by the clients
message = f"Current load is {load_average} out of {cpu_cores} CPU cores"
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(message)
# Return our local PluginResult object
return self.plugin_result
def cleanup(self):
"""
cleanup(): Perform special cleanup steps during node daemon termination
This step is optional and should be used sparingly.
"""
pass

View File

@ -128,6 +128,8 @@ pvc:
configuration:
# directories: PVC system directories
directories:
# plugin_directory: Directory containing node monitoring plugins
plugin_directory: "/usr/share/pvc/plugins"
# dynamic_directory: Temporary in-memory directory for active configurations
dynamic_directory: "/run/pvc"
# log_directory: Logging directory

View File

@ -27,6 +27,7 @@ import pvcnoded.util.services
import pvcnoded.util.libvirt
import pvcnoded.util.zookeeper
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance
@ -58,6 +59,7 @@ version = "0.9.61"
def entrypoint():
keepalive_timer = None
monitoring_instance = None
# Get our configuration
config = pvcnoded.util.config.get_configuration()
@ -204,7 +206,7 @@ def entrypoint():
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance
logger.out("Terminating pvcnoded and cleaning up", state="s")
@ -253,6 +255,13 @@ def entrypoint():
except Exception:
pass
# Clean up any monitoring plugins that have cleanup
try:
logger.out("Performing monitoring plugin cleanup", state="s")
monitoring_instance.run_cleanups()
except Exception:
pass
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
@ -1015,9 +1024,14 @@ def entrypoint():
state="i",
)
# Set up the node monitoring instance
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node
logger, config, zkhandler, this_node, monitoring_instance
)
# Tick loop; does nothing since everything is async

View File

@ -0,0 +1,357 @@
#!/usr/bin/env python3
# PluginInstance.py - Class implementing a PVC monitoring instance
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import concurrent.futures
import time
import importlib.util
from os import walk
from datetime import datetime
class PluginResult(object):
def __init__(self, zkhandler, config, logger, this_node, plugin_name):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.plugin_name = plugin_name
self.current_time = int(time.time())
self.health_delta = 0
self.message = None
self.data = None
self.runtime = "0.00"
def set_health_delta(self, new_delta):
self.health_delta = new_delta
def set_message(self, new_message):
self.message = new_message
def set_data(self, new_data):
self.data = new_data
def set_runtime(self, new_runtime):
self.runtime = new_runtime
def to_zookeeper(self):
self.zkhandler.write(
[
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.name",
self.plugin_name,
),
self.plugin_name,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.last_run",
self.plugin_name,
),
self.current_time,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.health_delta",
self.plugin_name,
),
self.health_delta,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.message",
self.plugin_name,
),
self.message,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.data",
self.plugin_name,
),
self.data,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.runtime",
self.plugin_name,
),
self.runtime,
),
]
)
class MonitoringPlugin(object):
def __init__(self, zkhandler, config, logger, this_node, plugin_name):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.plugin_name = plugin_name
self.plugin_result = PluginResult(
self.zkhandler,
self.config,
self.logger,
self.this_node,
self.plugin_name,
)
#
# Helper functions; exposed to child MonitoringPluginScript instances
#
def log(self, message, state="d"):
"""
Log a message to the PVC logger instance using the plugin name as a prefix
Takes "state" values as defined by the PVC logger instance, defaulting to debug:
"d": debug
"i": informational
"t": tick/keepalive
"w": warning
"e": error
"""
if state == "d" and not self.config["debug"]:
return
self.logger.out(message, state=state, prefix=self.plugin_name)
#
# Primary class functions; implemented by the individual plugins
#
def setup(self):
"""
setup(): Perform setup of the plugin; run once during daemon startup
OPTIONAL
"""
pass
def run(self):
"""
run(): Run the plugin, returning a PluginResult object
"""
return self.plugin_result
def cleanup(self):
"""
cleanup(): Clean up after the plugin; run once during daemon shutdown
OPTIONAL
"""
pass
class MonitoringInstance(object):
def __init__(self, zkhandler, config, logger, this_node):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
# Get a list of plugins from the plugin_directory
plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[
2
] # [] if no file
self.all_plugins = list()
self.all_plugin_names = list()
# Load each plugin file into the all_plugins list
for plugin_file in sorted(plugin_files):
try:
self.logger.out(
f"Loading monitoring plugin from {self.config['plugin_directory']}/{plugin_file}",
state="i",
)
loader = importlib.machinery.SourceFileLoader(
"plugin_script", f"{self.config['plugin_directory']}/{plugin_file}"
)
spec = importlib.util.spec_from_loader(loader.name, loader)
plugin_script = importlib.util.module_from_spec(spec)
spec.loader.exec_module(plugin_script)
plugin = plugin_script.MonitoringPluginScript(
self.zkhandler,
self.config,
self.logger,
self.this_node,
plugin_script.PLUGIN_NAME,
)
self.all_plugins.append(plugin)
self.all_plugin_names.append(plugin.plugin_name)
# Create plugin key
self.zkhandler.write(
[
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.name",
plugin.plugin_name,
),
plugin.plugin_name,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.last_run",
plugin.plugin_name,
),
"0",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.health_delta",
plugin.plugin_name,
),
"0",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.message",
plugin.plugin_name,
),
"Initializing",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.data",
plugin.plugin_name,
),
None,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.runtime",
plugin.plugin_name,
),
"0.00",
),
]
)
self.logger.out(
f"Successfully loaded monitoring plugin '{plugin.plugin_name}'",
state="o",
)
except Exception as e:
self.logger.out(
f"Failed to load monitoring plugin: {e}",
state="w",
)
self.zkhandler.write(
[
(
("node.monitoring.plugins", self.this_node.name),
self.all_plugin_names,
),
]
)
# Clean up any old plugin data for which a plugin file no longer exists
for plugin_key in self.zkhandler.children(
("node.monitoring.data", self.this_node.name)
):
if plugin_key not in self.all_plugin_names:
self.zkhandler.delete(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin",
plugin_key,
)
)
def run_plugin(self, plugin):
time_start = datetime.now()
result = plugin.run()
time_end = datetime.now()
time_delta = time_end - time_start
runtime = "{:0.02f}".format(time_delta.total_seconds())
result.set_runtime(runtime)
self.logger.out(
result.message, state="t", prefix=f"{plugin.plugin_name} ({runtime}s)"
)
result.to_zookeeper()
return result
def run_plugins(self):
total_health = 100
self.logger.out("Running monitoring plugins:", state="t")
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
executor.submit(self.run_plugin, plugin): plugin
for plugin in self.all_plugins
}
for future in concurrent.futures.as_completed(to_future_plugin_results):
plugin_results.append(future.result())
for result in plugin_results:
if result is not None:
total_health -= result.health_delta
if total_health > 90:
health_colour = self.logger.fmt_green
elif total_health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
self.logger.out(
f"System health: {health_colour}{total_health}/100{self.logger.fmt_end}",
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()
def run_cleanups(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
executor.submit(self.run_cleanup, plugin): plugin
for plugin in self.all_plugins
}
for future in concurrent.futures.as_completed(to_future_plugin_results):
# This doesn't do anything, just lets us wait for them all to complete
pass

View File

@ -180,6 +180,9 @@ def get_configuration():
raise MalformedConfigurationError(e)
config_directories = {
"plugin_directory": o_directories.get(
"plugin_directory", "/usr/share/pvc/plugins"
),
"dynamic_directory": o_directories.get("dynamic_directory", None),
"log_directory": o_directories.get("log_directory", None),
"console_log_directory": o_directories.get("console_log_directory", None),

View File

@ -51,7 +51,7 @@ libvirt_vm_states = {
}
def start_keepalive_timer(logger, config, zkhandler, this_node):
def start_keepalive_timer(logger, config, zkhandler, this_node, monitoring_instance):
keepalive_interval = config["keepalive_interval"]
logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node),
args=(logger, config, zkhandler, this_node, monitoring_instance),
trigger="interval",
seconds=keepalive_interval,
)
@ -648,7 +648,7 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node):
def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
debug = config["debug"]
if debug:
logger.out("Keepalive starting", state="d", prefix="main-thread")
@ -918,5 +918,7 @@ def node_keepalive(logger, config, zkhandler, this_node):
[(("node.state.daemon", node_name), "dead")]
)
monitoring_instance.run_plugins()
if debug:
logger.out("Keepalive finished", state="d", prefix="main-thread")