From f8fe0d073bbbf5b28c4c35829b3dec861612f3b5 Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Mon, 10 Sep 2018 01:12:17 -0400 Subject: [PATCH] Move daemon into module file; add stub launcher --- pvcd.py | 252 +-------------------------------------------- pvcd/Daemon.py | 271 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 273 insertions(+), 250 deletions(-) create mode 100644 pvcd/Daemon.py diff --git a/pvcd.py b/pvcd.py index 1554fafc..fbe08289 100755 --- a/pvcd.py +++ b/pvcd.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# pvcd.py - PVC hypervisor node daemon +# pvcd.py - Daemon startup stub # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018 Joshua M. Boniface @@ -20,252 +20,4 @@ # ############################################################################### -import kazoo.client -import libvirt -import sys -import os -import signal -import socket -import psutil -import subprocess -import uuid -import time -import configparser -import apscheduler.schedulers.background - -import pvcd.ansiiprint as ansiiprint -import pvcd.zkhandler as zkhandler -import pvcd.VMInstance as VMInstance -import pvcd.NodeInstance as NodeInstance - -print(ansiiprint.bold() + "pvcd - Parallel Virtual Cluster management daemon" + ansiiprint.end()) - -# Get the config file variable from the environment -try: - pvcd_config_file = os.environ['PVCD_CONFIG_FILE'] -except: - print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcd.') - exit(1) - -myhostname = socket.gethostname() -myshorthostname = myhostname.split('.', 1)[0] -mydomainname = ''.join(myhostname.split('.', 1)[1:]) - -# Config values dictionary -config_values = [ - 'zookeeper', - 'keepalive_interval', - 'fence_intervals', - 'suicide_intervals', - 'successful_fence', - 'failed_fence', - 'migration_target_selector', - 'ipmi_hostname', - 'ipmi_username', - 'ipmi_password' -] -def readConfig(pvcd_config_file, myhostname): - print('Loading configuration from file {}'.format(pvcd_config_file)) - - o_config = configparser.ConfigParser() - o_config.read(pvcd_config_file) - config = {} - - try: - entries = o_config[myhostname] - except: - try: - entries = o_config['default'] - except Exception as e: - print('ERROR: Config file is not valid!') - exit(1) - - for entry in config_values: - try: - config[entry] = entries[entry] - except: - try: - config[entry] = o_config['default'][entry] - except: - print('ERROR: Config file missing required value "{}" for this host!'.format(entry)) - exit(1) - - # Handle an empty ipmi_hostname - if config['ipmi_hostname'] == '': - config['ipmi_hostname'] = myshorthostname + '-lom.' + mydomainname - - return config - -# Get config -config = readConfig(pvcd_config_file, myhostname) - -# Check that libvirtd is listening TCP -libvirt_check_name = "qemu+tcp://127.0.0.1:16509/system" -try: - print('Connecting to Libvirt instance at {}'.format(libvirt_check_name)) - lv_conn = libvirt.open(libvirt_check_name) - if lv_conn == None: - raise -except: - print('ERROR: Failed to open local libvirt connection via TCP; required for PVC!') - exit(1) -lv_conn.close() - -# Connect to local zookeeper -zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper']) -try: - print('Connecting to Zookeeper instance at {}'.format(config['zookeeper'])) - zk_conn.start() -except: - print('ERROR: Failed to connect to Zookeeper') - exit(1) - -# Handle zookeeper failures -def zk_listener(state): - global zk_conn, update_timer - if state == kazoo.client.KazooState.SUSPENDED: - ansiiprint.echo('Connection to Zookeeper lost; retrying', '', 'e') - - # Stop keepalive thread - stopKeepaliveTimer(update_timer) - - while True: - _zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper']) - try: - _zk_conn.start() - zk_conn = _zk_conn - break - except: - time.sleep(1) - elif state == kazoo.client.KazooState.CONNECTED: - ansiiprint.echo('Connection to Zookeeper started', '', 'o') - - # Start keepalive thread - update_timer = createKeepaliveTimer() - else: - pass - -zk_conn.add_listener(zk_listener) - -# Cleanup function -def cleanup(signum, frame): - ansiiprint.echo('Terminating daemon', '', 'e') - # Set stop state in Zookeeper - zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' }) - # Close the Zookeeper connection - try: - zk_conn.stop() - zk_conn.close() - except: - pass - # Stop keepalive thread - stopKeepaliveTimer(update_timer) - # Exit - sys.exit(0) - -# Handle signals gracefully -signal.signal(signal.SIGTERM, cleanup) -signal.signal(signal.SIGINT, cleanup) -signal.signal(signal.SIGQUIT, cleanup) - -# Gather useful data about our host for staticdata -# Static data format: 'cpu_count', 'arch', 'os', 'kernel' -staticdata = [] -staticdata.append(str(psutil.cpu_count())) -staticdata.append(subprocess.run(['uname', '-r'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) -staticdata.append(subprocess.run(['uname', '-o'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) -staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) -# Print static data on start - -print('{0}Node hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), myhostname)) -print('{0}IPMI hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), config['ipmi_hostname'])) -print('{0}Machine details:{1}'.format(ansiiprint.bold(), ansiiprint.end())) -print(' {0}CPUs:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[0])) -print(' {0}Arch:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[1])) -print(' {0}OS:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[2])) -print(' {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[3])) - -# Check if our node exists in Zookeeper, and create it if not -if zk_conn.exists('/nodes/{}'.format(myhostname)): - print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper") - # Update static data just in case it's changed - zkhandler.writedata(zk_conn, { '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata) }) -else: - print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node") - keepalive_time = int(time.time()) - transaction = zk_conn.transaction() - transaction.create('/nodes/{}'.format(myhostname), 'hypervisor'.encode('ascii')) - # Basic state information - transaction.create('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii')) - transaction.create('/nodes/{}/domainstate'.format(myhostname), 'ready'.encode('ascii')) - transaction.create('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata).encode('ascii')) - transaction.create('/nodes/{}/memfree'.format(myhostname), '0'.encode('ascii')) - transaction.create('/nodes/{}/memused'.format(myhostname), '0'.encode('ascii')) - transaction.create('/nodes/{}/memalloc'.format(myhostname), '0'.encode('ascii')) - transaction.create('/nodes/{}/vcpualloc'.format(myhostname), '0'.encode('ascii')) - transaction.create('/nodes/{}/cpuload'.format(myhostname), '0.0'.encode('ascii')) - transaction.create('/nodes/{}/runningdomains'.format(myhostname), ''.encode('ascii')) - transaction.create('/nodes/{}/domainscount'.format(myhostname), '0'.encode('ascii')) - # Keepalives and fencing information - transaction.create('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time).encode('ascii')) - transaction.create('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname'].encode('ascii')) - transaction.create('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username'].encode('ascii')) - transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii')) - transaction.commit() - -zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'init' }) - -t_node = dict() -s_domain = dict() -node_list = [] -domain_list = [] - -@zk_conn.ChildrenWatch('/nodes') -def updatenodes(new_node_list): - global node_list - node_list = new_node_list - print(ansiiprint.blue() + 'Node list: ' + ansiiprint.end() + '{}'.format(' '.join(node_list))) - for node in node_list: - if node in t_node: - t_node[node].updatenodelist(t_node) - else: - t_node[node] = NodeInstance.NodeInstance(myhostname, node, t_node, s_domain, zk_conn, config) - -@zk_conn.ChildrenWatch('/domains') -def updatedomains(new_domain_list): - global domain_list - domain_list = new_domain_list - print(ansiiprint.blue() + 'Domain list: ' + ansiiprint.end() + '{}'.format(' '.join(domain_list))) - for domain in domain_list: - if not domain in s_domain: - s_domain[domain] = VMInstance.VMInstance(domain, zk_conn, config, t_node[myhostname]); - for node in node_list: - if node in t_node: - t_node[node].updatedomainlist(s_domain) - -# Set up our update function -this_node = t_node[myhostname] -update_zookeeper = this_node.update_zookeeper - -# Create timer to update this node in Zookeeper -def createKeepaliveTimer(): - interval = int(config['keepalive_interval']) - ansiiprint.echo('Starting keepalive timer ({} second interval)'.format(interval), '', 'o') - update_timer = apscheduler.schedulers.background.BackgroundScheduler() - update_timer.add_job(update_zookeeper, 'interval', seconds=interval) - update_timer.start() - return update_timer - -def stopKeepaliveTimer(update_timer): - ansiiprint.echo('Stopping keepalive timer', '', 'c') - update_timer.shutdown() - -# Start keepalive thread -update_timer = createKeepaliveTimer() - -# Tick loop -while True: - try: - time.sleep(0.1) - except: - break +import pvcd.Daemon diff --git a/pvcd/Daemon.py b/pvcd/Daemon.py new file mode 100644 index 00000000..5a339157 --- /dev/null +++ b/pvcd/Daemon.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 + +# Daemon.py - PVC hypervisor node daemon +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import kazoo.client +import libvirt +import sys +import os +import signal +import socket +import psutil +import subprocess +import uuid +import time +import configparser +import apscheduler.schedulers.background + +import pvcd.ansiiprint as ansiiprint +import pvcd.zkhandler as zkhandler +import pvcd.VMInstance as VMInstance +import pvcd.NodeInstance as NodeInstance + +print(ansiiprint.bold() + "pvcd - Parallel Virtual Cluster management daemon" + ansiiprint.end()) + +# Get the config file variable from the environment +try: + pvcd_config_file = os.environ['PVCD_CONFIG_FILE'] +except: + print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcd.') + exit(1) + +myhostname = socket.gethostname() +myshorthostname = myhostname.split('.', 1)[0] +mydomainname = ''.join(myhostname.split('.', 1)[1:]) + +# Config values dictionary +config_values = [ + 'zookeeper', + 'keepalive_interval', + 'fence_intervals', + 'suicide_intervals', + 'successful_fence', + 'failed_fence', + 'migration_target_selector', + 'ipmi_hostname', + 'ipmi_username', + 'ipmi_password' +] +def readConfig(pvcd_config_file, myhostname): + print('Loading configuration from file {}'.format(pvcd_config_file)) + + o_config = configparser.ConfigParser() + o_config.read(pvcd_config_file) + config = {} + + try: + entries = o_config[myhostname] + except: + try: + entries = o_config['default'] + except Exception as e: + print('ERROR: Config file is not valid!') + exit(1) + + for entry in config_values: + try: + config[entry] = entries[entry] + except: + try: + config[entry] = o_config['default'][entry] + except: + print('ERROR: Config file missing required value "{}" for this host!'.format(entry)) + exit(1) + + # Handle an empty ipmi_hostname + if config['ipmi_hostname'] == '': + config['ipmi_hostname'] = myshorthostname + '-lom.' + mydomainname + + return config + +# Get config +config = readConfig(pvcd_config_file, myhostname) + +# Check that libvirtd is listening TCP +libvirt_check_name = "qemu+tcp://127.0.0.1:16509/system" +try: + print('Connecting to Libvirt instance at {}'.format(libvirt_check_name)) + lv_conn = libvirt.open(libvirt_check_name) + if lv_conn == None: + raise +except: + print('ERROR: Failed to open local libvirt connection via TCP; required for PVC!') + exit(1) +lv_conn.close() + +# Connect to local zookeeper +zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper']) +try: + print('Connecting to Zookeeper instance at {}'.format(config['zookeeper'])) + zk_conn.start() +except: + print('ERROR: Failed to connect to Zookeeper') + exit(1) + +# Handle zookeeper failures +def zk_listener(state): + global zk_conn, update_timer + if state == kazoo.client.KazooState.SUSPENDED: + ansiiprint.echo('Connection to Zookeeper lost; retrying', '', 'e') + + # Stop keepalive thread + stopKeepaliveTimer(update_timer) + + while True: + _zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper']) + try: + _zk_conn.start() + zk_conn = _zk_conn + break + except: + time.sleep(1) + elif state == kazoo.client.KazooState.CONNECTED: + ansiiprint.echo('Connection to Zookeeper started', '', 'o') + + # Start keepalive thread + update_timer = createKeepaliveTimer() + else: + pass + +zk_conn.add_listener(zk_listener) + +# Cleanup function +def cleanup(signum, frame): + ansiiprint.echo('Terminating daemon', '', 'e') + # Set stop state in Zookeeper + zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' }) + # Close the Zookeeper connection + try: + zk_conn.stop() + zk_conn.close() + except: + pass + # Stop keepalive thread + stopKeepaliveTimer(update_timer) + # Exit + sys.exit(0) + +# Handle signals gracefully +signal.signal(signal.SIGTERM, cleanup) +signal.signal(signal.SIGINT, cleanup) +signal.signal(signal.SIGQUIT, cleanup) + +# Gather useful data about our host for staticdata +# Static data format: 'cpu_count', 'arch', 'os', 'kernel' +staticdata = [] +staticdata.append(str(psutil.cpu_count())) +staticdata.append(subprocess.run(['uname', '-r'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) +staticdata.append(subprocess.run(['uname', '-o'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) +staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout.decode('ascii').strip()) +# Print static data on start + +print('{0}Node hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), myhostname)) +print('{0}IPMI hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), config['ipmi_hostname'])) +print('{0}Machine details:{1}'.format(ansiiprint.bold(), ansiiprint.end())) +print(' {0}CPUs:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[0])) +print(' {0}Arch:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[1])) +print(' {0}OS:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[2])) +print(' {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[3])) + +# Check if our node exists in Zookeeper, and create it if not +if zk_conn.exists('/nodes/{}'.format(myhostname)): + print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper") + # Update static data just in case it's changed + zkhandler.writedata(zk_conn, { '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata) }) +else: + print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node") + keepalive_time = int(time.time()) + transaction = zk_conn.transaction() + transaction.create('/nodes/{}'.format(myhostname), 'hypervisor'.encode('ascii')) + # Basic state information + transaction.create('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii')) + transaction.create('/nodes/{}/domainstate'.format(myhostname), 'ready'.encode('ascii')) + transaction.create('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata).encode('ascii')) + transaction.create('/nodes/{}/memfree'.format(myhostname), '0'.encode('ascii')) + transaction.create('/nodes/{}/memused'.format(myhostname), '0'.encode('ascii')) + transaction.create('/nodes/{}/memalloc'.format(myhostname), '0'.encode('ascii')) + transaction.create('/nodes/{}/vcpualloc'.format(myhostname), '0'.encode('ascii')) + transaction.create('/nodes/{}/cpuload'.format(myhostname), '0.0'.encode('ascii')) + transaction.create('/nodes/{}/runningdomains'.format(myhostname), ''.encode('ascii')) + transaction.create('/nodes/{}/domainscount'.format(myhostname), '0'.encode('ascii')) + # Keepalives and fencing information + transaction.create('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time).encode('ascii')) + transaction.create('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname'].encode('ascii')) + transaction.create('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username'].encode('ascii')) + transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii')) + transaction.commit() + +zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'init' }) + +t_node = dict() +s_domain = dict() +node_list = [] +domain_list = [] + +@zk_conn.ChildrenWatch('/nodes') +def updatenodes(new_node_list): + global node_list + node_list = new_node_list + print(ansiiprint.blue() + 'Node list: ' + ansiiprint.end() + '{}'.format(' '.join(node_list))) + for node in node_list: + if node in t_node: + t_node[node].updatenodelist(t_node) + else: + t_node[node] = NodeInstance.NodeInstance(myhostname, node, t_node, s_domain, zk_conn, config) + +@zk_conn.ChildrenWatch('/domains') +def updatedomains(new_domain_list): + global domain_list + domain_list = new_domain_list + print(ansiiprint.blue() + 'Domain list: ' + ansiiprint.end() + '{}'.format(' '.join(domain_list))) + for domain in domain_list: + if not domain in s_domain: + s_domain[domain] = VMInstance.VMInstance(domain, zk_conn, config, t_node[myhostname]); + for node in node_list: + if node in t_node: + t_node[node].updatedomainlist(s_domain) + +# Set up our update function +this_node = t_node[myhostname] +update_zookeeper = this_node.update_zookeeper + +# Create timer to update this node in Zookeeper +def createKeepaliveTimer(): + interval = int(config['keepalive_interval']) + ansiiprint.echo('Starting keepalive timer ({} second interval)'.format(interval), '', 'o') + update_timer = apscheduler.schedulers.background.BackgroundScheduler() + update_timer.add_job(update_zookeeper, 'interval', seconds=interval) + update_timer.start() + return update_timer + +def stopKeepaliveTimer(update_timer): + ansiiprint.echo('Stopping keepalive timer', '', 'c') + update_timer.shutdown() + +# Start keepalive thread +update_timer = createKeepaliveTimer() + +# Tick loop +while True: + try: + time.sleep(0.1) + except: + break