From 9764090d6d88a4ecbd050458c04f7fa536131c53 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Tue, 1 Jun 2021 12:17:25 -0400 Subject: [PATCH] Merge node common with daemon common --- daemon-common/common.py | 162 ++++++++-- node-daemon/pvcnoded/CephInstance.py | 2 +- node-daemon/pvcnoded/DNSAggregatorInstance.py | 2 +- node-daemon/pvcnoded/Daemon.py | 4 +- node-daemon/pvcnoded/NodeInstance.py | 2 +- node-daemon/pvcnoded/VMInstance.py | 2 +- node-daemon/pvcnoded/VXNetworkInstance.py | 6 +- node-daemon/pvcnoded/common.py | 301 ------------------ node-daemon/pvcnoded/fencing.py | 2 +- 9 files changed, 146 insertions(+), 337 deletions(-) delete mode 100644 node-daemon/pvcnoded/common.py diff --git a/daemon-common/common.py b/daemon-common/common.py index a6907669..25787f25 100644 --- a/daemon-common/common.py +++ b/daemon-common/common.py @@ -22,45 +22,99 @@ import time import uuid import lxml -import shlex import subprocess +import signal from json import loads from re import match as re_match - from distutils.util import strtobool +from threading import Thread +from shlex import split as shlex_split + ############################################################################### # Supplemental functions ############################################################################### +# +# Run a local OS daemon in the background +# +class OSDaemon(object): + def __init__(self, command_string, environment, logfile): + command = shlex_split(command_string) + # Set stdout to be a logfile if set + if logfile: + stdout = open(logfile, 'a') + else: + stdout = subprocess.PIPE + + # Invoke the process + self.proc = subprocess.Popen( + command, + env=environment, + stdout=stdout, + stderr=stdout, + ) + + # Signal the process + def signal(self, sent_signal): + signal_map = { + 'hup': signal.SIGHUP, + 'int': signal.SIGINT, + 'term': signal.SIGTERM, + 'kill': signal.SIGKILL + } + self.proc.send_signal(signal_map[sent_signal]) + + +def run_os_daemon(command_string, environment=None, logfile=None): + daemon = OSDaemon(command_string, environment, logfile) + return daemon + # # Run a local OS command via shell # -def run_os_command(command_string, background=False, environment=None, timeout=None, shell=False): - command = shlex.split(command_string) - try: - command_output = subprocess.run( - command, - shell=shell, - env=environment, - timeout=timeout, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - retcode = command_output.returncode - except subprocess.TimeoutExpired: - retcode = 128 +def run_os_command(command_string, background=False, environment=None, timeout=None): + command = shlex_split(command_string) + if background: + def runcmd(): + try: + subprocess.run( + command, + env=environment, + timeout=timeout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except subprocess.TimeoutExpired: + pass + thread = Thread(target=runcmd, args=()) + thread.start() + return 0, None, None + else: + try: + command_output = subprocess.run( + command, + env=environment, + timeout=timeout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + retcode = command_output.returncode + except subprocess.TimeoutExpired: + retcode = 128 + except Exception: + retcode = 255 - try: - stdout = command_output.stdout.decode('ascii') - except Exception: - stdout = '' - try: - stderr = command_output.stderr.decode('ascii') - except Exception: - stderr = '' - return retcode, stdout, stderr + try: + stdout = command_output.stdout.decode('ascii') + except Exception: + stdout = '' + try: + stderr = command_output.stderr.decode('ascii') + except Exception: + stderr = '' + return retcode, stdout, stderr # @@ -449,7 +503,9 @@ def findTargetNode(zkhandler, dom_uuid): return None +# # Get the list of valid target nodes +# def getNodes(zkhandler, node_limit, dom_uuid): valid_node_list = [] full_node_list = zkhandler.children('/nodes') @@ -473,7 +529,9 @@ def getNodes(zkhandler, node_limit, dom_uuid): return valid_node_list +# # via free memory (relative to allocated memory) +# def findTargetNodeMem(zkhandler, node_limit, dom_uuid): most_provfree = 0 target_node = None @@ -493,7 +551,9 @@ def findTargetNodeMem(zkhandler, node_limit, dom_uuid): return target_node +# # via load average +# def findTargetNodeLoad(zkhandler, node_limit, dom_uuid): least_load = 9999.0 target_node = None @@ -509,7 +569,9 @@ def findTargetNodeLoad(zkhandler, node_limit, dom_uuid): return target_node +# # via total vCPUs +# def findTargetNodeVCPUs(zkhandler, node_limit, dom_uuid): least_vcpus = 9999 target_node = None @@ -525,7 +587,9 @@ def findTargetNodeVCPUs(zkhandler, node_limit, dom_uuid): return target_node +# # via total VMs +# def findTargetNodeVMs(zkhandler, node_limit, dom_uuid): least_vms = 9999 target_node = None @@ -541,7 +605,9 @@ def findTargetNodeVMs(zkhandler, node_limit, dom_uuid): return target_node -# Connect to the primary host and run a command +# +# Connect to the primary node and run a command +# def runRemoteCommand(node, command, become=False): import paramiko import hashlib @@ -571,3 +637,47 @@ def runRemoteCommand(node, command, become=False): ssh_client.connect(node) stdin, stdout, stderr = ssh_client.exec_command(command) return stdout.read().decode('ascii').rstrip(), stderr.read().decode('ascii').rstrip() + + +# +# Reload the firewall rules of the system +# +def reload_firewall_rules(rules_file, logger=None): + if logger is not None: + logger.out('Reloading firewall configuration', state='o') + + retcode, stdout, stderr = run_os_command('/usr/sbin/nft -f {}'.format(rules_file)) + if retcode != 0 and logger is not None: + logger.out('Failed to reload configuration: {}'.format(stderr), state='e') + + +# +# Create an IP address +# +def createIPAddress(ipaddr, cidrnetmask, dev): + run_os_command( + 'ip address add {}/{} dev {}'.format( + ipaddr, + cidrnetmask, + dev + ) + ) + run_os_command( + 'arping -P -U -W 0.02 -c 2 -i {dev} -S {ip} {ip}'.format( + dev=dev, + ip=ipaddr + ) + ) + + +# +# Remove an IP address +# +def removeIPAddress(ipaddr, cidrnetmask, dev): + run_os_command( + 'ip address delete {}/{} dev {}'.format( + ipaddr, + cidrnetmask, + dev + ) + ) diff --git a/node-daemon/pvcnoded/CephInstance.py b/node-daemon/pvcnoded/CephInstance.py index daff4c6b..ac9aefb0 100644 --- a/node-daemon/pvcnoded/CephInstance.py +++ b/node-daemon/pvcnoded/CephInstance.py @@ -23,7 +23,7 @@ import time import json import psutil -import pvcnoded.common as common +import daemon_lib.common as common class CephOSDInstance(object): diff --git a/node-daemon/pvcnoded/DNSAggregatorInstance.py b/node-daemon/pvcnoded/DNSAggregatorInstance.py index 0e11c9e8..cc32cd45 100644 --- a/node-daemon/pvcnoded/DNSAggregatorInstance.py +++ b/node-daemon/pvcnoded/DNSAggregatorInstance.py @@ -26,7 +26,7 @@ import psycopg2 from threading import Thread, Event -import pvcnoded.common as common +import daemon_lib.common as common class DNSAggregatorInstance(object): diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 39b099b6..4e8735bd 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -44,7 +44,7 @@ from daemon_lib.zkhandler import ZKHandler import pvcnoded.log as log import pvcnoded.fencing as fencing -import pvcnoded.common as common +import daemon_lib.common as common import pvcnoded.VMInstance as VMInstance import pvcnoded.NodeInstance as NodeInstance @@ -782,7 +782,7 @@ if enable_networking: nftables_base_filename = '{}/base.nft'.format(config['nft_dynamic_directory']) with open(nftables_base_filename, 'w') as nfbasefile: nfbasefile.write(nftables_base_rules) - common.reload_firewall_rules(logger, nftables_base_filename) + common.reload_firewall_rules(nftables_base_filename, logger=logger) ############################################################################### # PHASE 7d - Ensure DNSMASQ is not running diff --git a/node-daemon/pvcnoded/NodeInstance.py b/node-daemon/pvcnoded/NodeInstance.py index e28a5f86..bcf5e711 100644 --- a/node-daemon/pvcnoded/NodeInstance.py +++ b/node-daemon/pvcnoded/NodeInstance.py @@ -23,7 +23,7 @@ import time from threading import Thread -import pvcnoded.common as common +import daemon_lib.common as common class NodeInstance(object): diff --git a/node-daemon/pvcnoded/VMInstance.py b/node-daemon/pvcnoded/VMInstance.py index 8175de9b..52a7cd0a 100644 --- a/node-daemon/pvcnoded/VMInstance.py +++ b/node-daemon/pvcnoded/VMInstance.py @@ -28,7 +28,7 @@ from threading import Thread from xml.etree import ElementTree -import pvcnoded.common as common +import daemon_lib.common as common import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance diff --git a/node-daemon/pvcnoded/VXNetworkInstance.py b/node-daemon/pvcnoded/VXNetworkInstance.py index 6fd8648e..bcabcfc7 100644 --- a/node-daemon/pvcnoded/VXNetworkInstance.py +++ b/node-daemon/pvcnoded/VXNetworkInstance.py @@ -24,7 +24,7 @@ import time from textwrap import dedent -import pvcnoded.common as common +import daemon_lib.common as common class VXNetworkInstance(object): @@ -452,7 +452,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out # Reload firewall rules nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory']) - common.reload_firewall_rules(self.logger, nftables_base_filename) + common.reload_firewall_rules(nftables_base_filename, logger=self.logger) # Create bridged network configuration def createNetworkBridged(self): @@ -798,7 +798,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out # Reload firewall rules nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory']) - common.reload_firewall_rules(self.logger, nftables_base_filename) + common.reload_firewall_rules(nftables_base_filename, logger=self.logger) def removeGateways(self): if self.nettype == 'managed': diff --git a/node-daemon/pvcnoded/common.py b/node-daemon/pvcnoded/common.py deleted file mode 100644 index 74fd8f29..00000000 --- a/node-daemon/pvcnoded/common.py +++ /dev/null @@ -1,301 +0,0 @@ -#!/usr/bin/env python3 - -# common.py - PVC daemon function library, common fuctions -# Part of the Parallel Virtual Cluster (PVC) system -# -# Copyright (C) 2018-2021 Joshua M. Boniface -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -############################################################################### - -import subprocess -import signal - -from threading import Thread -from shlex import split as shlex_split - - -class OSDaemon(object): - def __init__(self, command_string, environment, logfile): - command = shlex_split(command_string) - # Set stdout to be a logfile if set - if logfile: - stdout = open(logfile, 'a') - else: - stdout = subprocess.PIPE - - # Invoke the process - self.proc = subprocess.Popen( - command, - env=environment, - stdout=stdout, - stderr=stdout, - ) - - # Signal the process - def signal(self, sent_signal): - signal_map = { - 'hup': signal.SIGHUP, - 'int': signal.SIGINT, - 'term': signal.SIGTERM, - 'kill': signal.SIGKILL - } - self.proc.send_signal(signal_map[sent_signal]) - - -def run_os_daemon(command_string, environment=None, logfile=None): - daemon = OSDaemon(command_string, environment, logfile) - return daemon - - -# Run a oneshot command, optionally without blocking -def run_os_command(command_string, background=False, environment=None, timeout=None): - command = shlex_split(command_string) - if background: - def runcmd(): - try: - subprocess.run( - command, - env=environment, - timeout=timeout, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - except subprocess.TimeoutExpired: - pass - thread = Thread(target=runcmd, args=()) - thread.start() - return 0, None, None - else: - try: - command_output = subprocess.run( - command, - env=environment, - timeout=timeout, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - retcode = command_output.returncode - except subprocess.TimeoutExpired: - retcode = 128 - except Exception: - retcode = 255 - - try: - stdout = command_output.stdout.decode('ascii') - except Exception: - stdout = '' - try: - stderr = command_output.stderr.decode('ascii') - except Exception: - stderr = '' - return retcode, stdout, stderr - - -# Reload the firewall rules of the system -def reload_firewall_rules(logger, rules_file): - logger.out('Reloading firewall configuration', state='o') - retcode, stdout, stderr = run_os_command('/usr/sbin/nft -f {}'.format(rules_file)) - if retcode != 0: - logger.out('Failed to reload configuration: {}'.format(stderr), state='e') - - -# Create IP address -def createIPAddress(ipaddr, cidrnetmask, dev): - run_os_command( - 'ip address add {}/{} dev {}'.format( - ipaddr, - cidrnetmask, - dev - ) - ) - run_os_command( - 'arping -P -U -W 0.02 -c 2 -i {dev} -S {ip} {ip}'.format( - dev=dev, - ip=ipaddr - ) - ) - - -# Remove IP address -def removeIPAddress(ipaddr, cidrnetmask, dev): - run_os_command( - 'ip address delete {}/{} dev {}'.format( - ipaddr, - cidrnetmask, - dev - ) - ) - - -# -# Find a migration target -# -def findTargetNode(zkhandler, config, logger, dom_uuid): - # Determine VM node limits; set config value if read fails - try: - node_limit = zkhandler.read('/domains/{}/node_limit'.format(dom_uuid)).split(',') - if not any(node_limit): - node_limit = '' - except Exception: - node_limit = '' - zkhandler.write([ - ('/domains/{}/node_limit'.format(dom_uuid), '') - ]) - - # Determine VM search field - try: - search_field = zkhandler.read('/domains/{}/node_selector'.format(dom_uuid)) - except Exception: - search_field = None - - # If our search field is invalid, use the default - if search_field is None or search_field == 'None': - search_field = zkhandler.read('/config/migration_target_selector') - - if config['debug']: - logger.out('Migrating VM {} with selector {}'.format(dom_uuid, search_field), state='d', prefix='node-flush') - - # Execute the search - if search_field == 'mem': - return findTargetNodeMem(zkhandler, config, logger, node_limit, dom_uuid) - if search_field == 'load': - return findTargetNodeLoad(zkhandler, config, logger, node_limit, dom_uuid) - if search_field == 'vcpus': - return findTargetNodeVCPUs(zkhandler, config, logger, node_limit, dom_uuid) - if search_field == 'vms': - return findTargetNodeVMs(zkhandler, config, logger, node_limit, dom_uuid) - - # Nothing was found - return None - - -# Get the list of valid target nodes -def getNodes(zkhandler, node_limit, dom_uuid): - valid_node_list = [] - full_node_list = zkhandler.children('/nodes') - current_node = zkhandler.read('/domains/{}/node'.format(dom_uuid)) - - for node in full_node_list: - if node_limit and node not in node_limit: - continue - - daemon_state = zkhandler.read('/nodes/{}/daemonstate'.format(node)) - domain_state = zkhandler.read('/nodes/{}/domainstate'.format(node)) - - if node == current_node: - continue - - if daemon_state != 'run' or domain_state != 'ready': - continue - - valid_node_list.append(node) - - return valid_node_list - - -# via free memory (relative to allocated memory) -def findTargetNodeMem(zkhandler, config, logger, node_limit, dom_uuid): - most_provfree = 0 - target_node = None - - node_list = getNodes(zkhandler, node_limit, dom_uuid) - if config['debug']: - logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush') - - for node in node_list: - memprov = int(zkhandler.read('/nodes/{}/memprov'.format(node))) - memused = int(zkhandler.read('/nodes/{}/memused'.format(node))) - memfree = int(zkhandler.read('/nodes/{}/memfree'.format(node))) - memtotal = memused + memfree - provfree = memtotal - memprov - - if config['debug']: - logger.out('Evaluating node {} with {} provfree'.format(node, provfree), state='d', prefix='node-flush') - if provfree > most_provfree: - most_provfree = provfree - target_node = node - - if config['debug']: - logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush') - return target_node - - -# via load average -def findTargetNodeLoad(zkhandler, config, logger, node_limit, dom_uuid): - least_load = 9999.0 - target_node = None - - node_list = getNodes(zkhandler, node_limit, dom_uuid) - if config['debug']: - logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush') - - for node in node_list: - load = float(zkhandler.read('/nodes/{}/cpuload'.format(node))) - - if config['debug']: - logger.out('Evaluating node {} with load {}'.format(node, load), state='d', prefix='node-flush') - if load < least_load: - least_load = load - target_node = node - - if config['debug']: - logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush') - return target_node - - -# via total vCPUs -def findTargetNodeVCPUs(zkhandler, config, logger, node_limit, dom_uuid): - least_vcpus = 9999 - target_node = None - - node_list = getNodes(zkhandler, node_limit, dom_uuid) - if config['debug']: - logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush') - - for node in node_list: - vcpus = int(zkhandler.read('/nodes/{}/vcpualloc'.format(node))) - - if config['debug']: - logger.out('Evaluating node {} with vcpualloc {}'.format(node, vcpus), state='d', prefix='node-flush') - if vcpus < least_vcpus: - least_vcpus = vcpus - target_node = node - - if config['debug']: - logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush') - return target_node - - -# via total VMs -def findTargetNodeVMs(zkhandler, config, logger, node_limit, dom_uuid): - least_vms = 9999 - target_node = None - - node_list = getNodes(zkhandler, node_limit, dom_uuid) - if config['debug']: - logger.out('Found nodes: {}'.format(node_list), state='d', prefix='node-flush') - - for node in node_list: - vms = int(zkhandler.read('/nodes/{}/domainscount'.format(node))) - - if config['debug']: - logger.out('Evaluating node {} with VM count {}'.format(node, vms), state='d', prefix='node-flush') - if vms < least_vms: - least_vms = vms - target_node = node - - if config['debug']: - logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush') - return target_node diff --git a/node-daemon/pvcnoded/fencing.py b/node-daemon/pvcnoded/fencing.py index 2e1c41be..8e15848e 100644 --- a/node-daemon/pvcnoded/fencing.py +++ b/node-daemon/pvcnoded/fencing.py @@ -21,7 +21,7 @@ import time -import pvcnoded.common as common +import daemon_lib.common as common import pvcnoded.VMInstance as VMInstance