Massive rejigger into single daemon

Completely restructure the daemon code to move the 4 discrete daemons
into a single daemon that can be run on every hypervisor. Introduce the
idea of a static list of "coordinator" nodes which are configured at
install time to run Zookeeper and FRR in router mode, and which are
allowed to take on client network management duties (gateway, DHCP, DNS,
etc.) while also allowing them to run VMs (i.e. no dedicated "router"
nodes required).
This commit is contained in:
2018-10-14 02:01:35 -04:00
parent 25df845769
commit f198f62563
57 changed files with 1726 additions and 2307 deletions

View File

@ -0,0 +1,66 @@
# pvcd cluster configuration file example
#
# This configuration file specifies details for this node in PVC. Multiple node
# blocks can be added but only the one matching the current system nodename will
# be used by the local daemon. Default values are not supported; the values in
# this sample configuration are considered defaults and, with adjustment of the
# nodename section and coordinators list, can be used as-is on a Debian system.
#
# The following values are required for each node or in a default section:
# coordinators: A CSV list of the short hostnames of the coordinator nodes; these nodes become
# members of the Zookeeper cluster, can act as routers, and perform additional
# special functions in a cluster; ideally there are 3 coordinators, though 5
# coordinators are supported
# dynamic_directory: The ramdisk directory for PVC to store its dynamic configurations,
# usually under /run or /var/run
# log_directory: The logging directory, usually under /var/log
# file_logging = Whether to log daemon to a file (pvc.log under log_directory) in addition to
# normal stdout printing
# keepalive_interval: the interval between keepalives and for dead node timeout (defaults to 5)
# fence_intervals: the number of keepalive_intervals without Zookeeper contact before this node
# will consider another node dead and fence it (defaults to 6, i.e. 30s)
# suicide_intervals: the number of keepalive_intervals without Zookeeper contact before this
# node will consider itself failed and terminate all running VMs (defaults
# to 0, i.e. disabled); should be less than "fence_intervals"
# successful_fence: the action to take on a successful fencing operation; can be "none" or
# "migrate" (defaults to "migrate")
# failed_fence: the action to take on a failed fencing operation; can be "none" or "migrate"
# (defaults to "none"); "migrate" requires "suicide_intervals" to be set)
# NOTE: POTENTIALLY DANGEROUS - see README for details
# migration_target_selector: the method to use to select target nodes during a virtual machine
# flush action; can be "mem", "load", "vcpus", or "vms" (defaults
# to "mem"); the best choice based on this field is selected for
# each VM to be migrated
# The following values are required for each node specifically (usually node-unique):
# vni_dev: the lower-level network device to bind VNI traffic to
# vni_dev_ip: the IP address (in CIDR format) of the lower-level network device, used by frr
# to communicate between nodes and pass routes between them.
# storage_dev: the lower-level network device to bind storage traffic to
# storage_dev_ip: the IP address (in CIDR format) of the lower-level network device, used by
# Ceph for storage traffic (both monitor and OSD).
# ipmi_hostname: the IPMI hostname for fencing (defaults to <shortname>-lom.<domain>)
# ipmi_username: username to connect to IPMI
# ipmi_password: password to connect to IPMI
#
# Copy this example to /etc/pvc/pvcd.conf and edit to your needs
[default]
coordinators = pvc-hv1,pvc-hv2,pvc-hv3
dynamic_directory = /run/pvc
log_directory = /var/log/pvc
file_logging = True
keepalive_interval = 5
fence_intervals = 6
suicide_intervals = 0
successful_fence = migrate
failed_fence = none
migration_target_selector = mem
[pvc-hv1]
vni_dev = ens4
vni_dev_ip = 10.255.0.1/24
storage_dev = ens4
storage_dev_ip = 10.254.0.1/24
ipmi_username = admin
ipmi_password = Passw0rd
ipmi_hostname = pvc-hv1-lom

23
node-daemon/pvcd.py Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# pvcd.py - Node daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcd.Daemon

16
node-daemon/pvcd.service Normal file
View File

@ -0,0 +1,16 @@
# Parallel Virtual Cluster virtualization daemon unit file
[Unit]
Description = Parallel Virtual Cluster node daemon
After = network-online.target libvirtd.service zookeeper.service
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVCD_CONFIG_FILE=/etc/pvc/pvcd.conf
ExecStart = /usr/share/pvc/pvcd.py
KillSignal = SIGINT
Restart = on-failure
[Install]
WantedBy = multi-user.target

571
node-daemon/pvcd/Daemon.py Normal file
View File

@ -0,0 +1,571 @@
#!/usr/bin/env python3
# Daemon.py - Node daemon
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
version = '0.4'
import kazoo.client
import libvirt
import sys
import os
import signal
import atexit
import socket
import psutil
import subprocess
import uuid
import time
import re
import configparser
import apscheduler.schedulers.background
import pvcd.log as log
import pvcd.zkhandler as zkhandler
import pvcd.common as common
import pvcd.DomainInstance as DomainInstance
import pvcd.NodeInstance as NodeInstance
import pvcd.VXNetworkInstance as VXNetworkInstance
###############################################################################
# PVCD - node daemon startup program
###############################################################################
#
# The PVC daemon starts a node and configures all the required components for
# the node to run. It determines which of the 3 daemon modes it should be in
# during initial setup based on hostname and the config file, and then starts
# any required services. The 3 daemon modes are:
# * leader: the cluster leader, follows the Zookeeper leader
# * coordinator: a Zookeeper cluster member
# * hypervisor: a hypervisor without any cluster intelligence
#
###############################################################################
###############################################################################
# Daemon functions
###############################################################################
# Create timer to update this node in Zookeeper
def startKeepaliveTimer():
global update_timer
interval = int(config['keepalive_interval'])
logger.out('Starting keepalive timer ({} second interval)'.format(interval), state='s')
update_timer.add_job(update_zookeeper, 'interval', seconds=interval)
update_timer.start()
def stopKeepaliveTimer():
global update_timer
try:
update_timer.shutdown()
logger.out('Stopping keepalive timer', state='s')
except:
pass
###############################################################################
# PHASE 1a - Configuration parsing
###############################################################################
# Get the config file variable from the environment
try:
pvcvd_config_file = os.environ['PVCD_CONFIG_FILE']
except:
print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcd.')
exit(1)
# Set local hostname and domain variables
myfqdn = socket.gethostname()
#myfqdn = 'pvc-hv1.domain.net'
myhostname = myfqdn.split('.', 1)[0]
mydomainname = ''.join(myfqdn.split('.', 1)[1:])
mynodeid = re.findall(r'\d+', myhostname)[-1]
# Gather useful data about our host
# Static data format: 'cpu_count', 'arch', 'os', 'kernel'
staticdata = []
staticdata.append(str(psutil.cpu_count()))
staticdata.append(subprocess.run(['uname', '-r'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
staticdata.append(subprocess.run(['uname', '-o'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
# Create our timer object
update_timer = apscheduler.schedulers.background.BackgroundScheduler()
# Config values dictionary
config_values = [
'coordinators',
'dynamic_directory',
'log_directory',
'file_logging',
'keepalive_interval',
'fence_intervals',
'suicide_intervals',
'successful_fence',
'failed_fence',
'migration_target_selector',
'vni_dev',
'vni_dev_ip',
'storage_dev',
'storage_dev_ip',
'ipmi_hostname',
'ipmi_username',
'ipmi_password'
]
# Read and parse the config file
def readConfig(pvcvd_config_file, myhostname):
print('Loading configuration from file "{}"'.format(pvcvd_config_file))
o_config = configparser.ConfigParser()
o_config.read(pvcvd_config_file)
config = {}
try:
entries = o_config[myhostname]
except:
try:
entries = o_config['default']
except Exception as e:
print('ERROR: Config file is not valid!')
exit(1)
for entry in config_values:
try:
config[entry] = entries[entry]
except:
try:
config[entry] = o_config['default'][entry]
except:
print('ERROR: Config file missing required value "{}" for this host!'.format(entry))
exit(1)
# Handle an empty ipmi_hostname
if config['ipmi_hostname'] == '':
config['ipmi_hostname'] = myshorthostname + '-lom.' + mydomainname
return config
# Get the config object from readConfig()
config = readConfig(pvcvd_config_file, myhostname)
###############################################################################
# PHASE 1b - Prepare filesystem directories
###############################################################################
# Define our dynamic directory schema
# <dynamic_directory>/
# dnsmasq/
# pdns/
# nft/
config['dnsmasq_dynamic_directory'] = config['dynamic_directory'] + '/dnsmasq'
config['pdns_dynamic_directory'] = config['dynamic_directory'] + '/pdns'
config['nft_dynamic_directory'] = config['dynamic_directory'] + '/nft'
# Create our dynamic directories if they don't exist
if not os.path.exists(config['dynamic_directory']):
os.makedirs(config['dynamic_directory'])
os.makedirs(config['dnsmasq_dynamic_directory'])
os.makedirs(config['pdns_dynamic_directory'])
os.makedirs(config['nft_dynamic_directory'])
# Define our log directory schema
# <log_directory>/
# dnsmasq/
# pdns/
# nft/
config['dnsmasq_log_directory'] = config['log_directory'] + '/dnsmasq'
config['pdns_log_directory'] = config['log_directory'] + '/pdns'
config['nft_log_directory'] = config['log_directory'] + '/nft'
# Create our dynamic directories if they don't exist
if not os.path.exists(config['log_directory']):
os.makedirs(config['log_directory'])
os.makedirs(config['dnsmasq_log_directory'])
os.makedirs(config['pdns_log_directory'])
os.makedirs(config['nft_log_directory'])
###############################################################################
# PHASE 1c - Set up logging
###############################################################################
logger = log.Logger(config)
# Print our startup messages
logger.out('Parallel Virtual Cluster node daemon v{}'.format(version))
logger.out('FQDN: {}'.format(myfqdn))
logger.out('Host: {}'.format(myhostname))
logger.out('ID: {}'.format(mynodeid))
logger.out('IPMI hostname: {}'.format(config['ipmi_hostname']))
logger.out('Machine details:')
logger.out(' CPUs: {}'.format(staticdata[0]))
logger.out(' Arch: {}'.format(staticdata[3]))
logger.out(' OS: {}'.format(staticdata[2]))
logger.out(' Kernel: {}'.format(staticdata[1]))
logger.out('Starting pvcd on host {}'.format(myfqdn), state='s')
###############################################################################
# PHASE 2 - Determine coordinator mode and start Zookeeper on coordinators
###############################################################################
# What is the list of coordinator hosts
coordinator_hosts = config['coordinators'].split(',')
if myhostname in coordinator_hosts:
# We are indeed a coordinator host
config['daemon_mode'] = 'coordinator'
# Start the zookeeper service using systemctl
logger.out('Node is a ' + logger.fmt_blue + 'coordinator' + logger.fmt_end +'; starting Zookeeper daemon', state='i')
common.run_os_command('systemctl start zookeeper.service')
time.sleep(1)
else:
config['daemon_mode'] = 'hypervisor'
###############################################################################
# PHASE 3 - Attempt to connect to the coordinators and start zookeeper client
###############################################################################
# Start the connection to the coordinators
zk_conn = kazoo.client.KazooClient(hosts=config['coordinators'])
try:
logger.out('Connecting to Zookeeper cluster hosts {}'.format(config['coordinators']), state='i')
# Start connection
zk_conn.start()
except Exception as e:
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
exit(1)
# Handle zookeeper failures
def zk_listener(state):
global zk_conn, update_timer
if state == kazoo.client.KazooState.SUSPENDED:
logger.out('Connection to Zookeeper lost; retrying', state='w')
# Stop keepalive thread
if update_timer:
stopKeepaliveTimer()
while True:
try:
zk_conn.start()
break
except:
time.sleep(1)
elif state == kazoo.client.KazooState.CONNECTED:
logger.out('Connection to Zookeeper restarted', state='o')
# Start keepalive thread
if update_timer:
update_timer = createKeepaliveTimer()
else:
pass
zk_conn.add_listener(zk_listener)
###############################################################################
# PHASE 4 - Gracefully handle termination
###############################################################################
# Cleanup function
def cleanup():
global zk_conn, update_timer
# Stop keepalive thread
stopKeepaliveTimer()
logger.out('Terminating pvcd and cleaning up', state='s')
# Set stop state in Zookeeper
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' })
# Force into secondary network state if needed
if this_node.name == this_node.primary_node:
zkhandler.writedata(zk_conn, { '/primary_node': 'none' })
# Wait for things to flush
time.sleep(3)
# Close the Zookeeper connection
try:
zk_conn.stop()
zk_conn.close()
except:
pass
# Handle exit gracefully
atexit.register(cleanup)
# Termination function
def term(signum='', frame=''):
# Exit
sys.exit(0)
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
###############################################################################
# PHASE 5 - Prepare host in Zookeeper
###############################################################################
# Check if our node exists in Zookeeper, and create it if not
if zk_conn.exists('/nodes/{}'.format(myhostname)):
logger.out("Node is " + logger.fmt_green + "present" + logger.fmt_end + " in Zookeeper", state='i')
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'init' })
# Update static data just in case it's changed
zkhandler.writedata(zk_conn, { '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata) })
else:
logger.out("Node is " + logger.fmt_red + "absent" + logger.fmt_end + " in Zookeeper; adding new node", state='i')
keepalive_time = int(time.time())
transaction = zk_conn.transaction()
transaction.create('/nodes/{}'.format(myhostname), config['daemon_mode'].encode('ascii'))
# Basic state information
transaction.create('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode'].encode('ascii'))
transaction.create('/nodes/{}/daemonstate'.format(myhostname), 'init'.encode('ascii'))
transaction.create('/nodes/{}/routerstate'.format(myhostname), 'client'.encode('ascii'))
transaction.create('/nodes/{}/domainstate'.format(myhostname), 'flushed'.encode('ascii'))
transaction.create('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata).encode('ascii'))
transaction.create('/nodes/{}/memfree'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/memused'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/memalloc'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/vcpualloc'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/cpuload'.format(myhostname), '0.0'.encode('ascii'))
transaction.create('/nodes/{}/networkscount'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/domainscount'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/runningdomains'.format(myhostname), ''.encode('ascii'))
# Keepalives and fencing information
transaction.create('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time).encode('ascii'))
transaction.create('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname'].encode('ascii'))
transaction.create('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username'].encode('ascii'))
transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii'))
transaction.commit()
# Check that the primary key exists, and create it with us as master if not
current_primary = zkhandler.readdata(zk_conn, '/primary_node')
if current_primary and current_primary != 'none':
logger.out('Current primary node is "{}{}{}".'.format(logger.fmt_blue, current_primary, logger.fmt_end), state='i')
else:
logger.out('No primary node key found; creating with us as primary.', state='i')
zkhandler.writedata(zk_conn, { '/primary_node': myhostname })
###############################################################################
# PHASE 6 - Create local IP addresses for VNI and Storage networks
###############################################################################
# VNI configuration
vni_dev = config['vni_dev']
vni_dev_ip = config['vni_dev_ip']
logger.out('Setting up VNI network on interface {} with IP {}'.format(vni_dev, vni_dev_ip), state='i')
common.run_os_command('ip link set {} up'.format(vni_dev))
common.run_os_command('ip address add {} dev {}'.format(vni_dev_ip, vni_dev))
# Storage configurationm
storage_dev = config['storage_dev']
storage_dev_ip = config['storage_dev_ip']
logger.out('Setting up Storage network on interface {} with IP {}'.format(storage_dev, storage_dev_ip), state='i')
common.run_os_command('ip link set {} up'.format(storage_dev))
common.run_os_command('ip address add {} dev {}'.format(storage_dev_ip, storage_dev))
###############################################################################
# PHASE 7a - Ensure Libvirt is running on the local host
###############################################################################
# Start the zookeeper service using systemctl
logger.out('Starting Libvirt daemon', state='i')
common.run_os_command('systemctl start libvirtd.service')
time.sleep(1)
# Check that libvirtd is listening TCP
libvirt_check_name = "qemu+tcp://127.0.0.1:16509/system"
logger.out('Connecting to Libvirt daemon at {}'.format(libvirt_check_name), state='i')
try:
lv_conn = libvirt.open(libvirt_check_name)
lv_conn.close()
except Exception as e:
logger.out('ERROR: Failed to connect to Libvirt daemon: {}'.format(e), state='e')
exit(1)
###############################################################################
# PHASE 7b - Ensure Ceph is running on the local host
###############################################################################
# if coordinator, start ceph-mon
# if hypervisor or coodinator, start ceph-osds
###############################################################################
# PHASE 7c - Ensure NFT is running on the local host
###############################################################################
logger.out("Creating NFT firewall configuration", state='i')
# Create our config dirs
common.run_os_command(
'/bin/mkdir --parents {}/networks'.format(
config['nft_dynamic_directory']
)
)
common.run_os_command(
'/bin/mkdir --parents {}/static'.format(
config['nft_dynamic_directory']
)
)
common.run_os_command(
'/bin/mkdir --parents {}'.format(
config['nft_dynamic_directory']
)
)
# Set up the basic features of the nftables firewall
nftables_base_rules = """# Base rules
flush ruleset
# Add the filter table and chains
add table inet filter
add chain inet filter forward {{ type filter hook forward priority 0; }}
add chain inet filter input {{ type filter hook input priority 0; }}
# Include static rules and network rules
include "{rulesdir}/static/*"
include "{rulesdir}/networks/*"
""".format(
rulesdir=config['nft_dynamic_directory']
)
# Write the basic firewall config
nftables_base_filename = '{}/base.nft'.format(config['nft_dynamic_directory'])
nftables_update_filename = '{}/update'.format(config['nft_dynamic_directory'])
with open(nftables_base_filename, 'w') as nfbasefile:
nfbasefile.write(nftables_base_rules)
# Notify a reload of the firewall rules on next keepalive update
open(nftables_update_filename, 'a').close()
###############################################################################
# PHASE 8 - Set up our objects
###############################################################################
logger.out('Setting up objects', state='i')
d_node = dict()
d_network = dict()
d_domain = dict()
node_list = []
network_list = []
domain_list = []
# Node objects
@zk_conn.ChildrenWatch('/nodes')
def update_nodes(new_node_list):
global node_list, d_node
# Add any missing nodes to the list
for node in new_node_list:
if not node in node_list:
d_node[node] = NodeInstance.NodeInstance(node, myhostname, zk_conn, config, logger, d_node, d_network, d_domain)
# Remove any deleted nodes from the list
for node in node_list:
if not node in new_node_list:
# Delete the object
del(d_node[node])
# Update and print new list
node_list = new_node_list
logger.out('{}Node list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(node_list)), state='i')
# Update node objects' list
for node in d_node:
d_node[node].update_node_list(d_node)
# Alias for our local node (passed to network and domain objects)
this_node = d_node[myhostname]
# Network objects
@zk_conn.ChildrenWatch('/networks')
def update_networks(new_network_list):
global network_list, d_network
# Add any missing networks to the list
for network in new_network_list:
if not network in network_list:
d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zk_conn, config, logger, this_node)
# Start primary functionality
if this_node.router_state == 'primary':
d_network[network].createGatewayAddress()
d_network[network].startDHCPServer()
# Remove any deleted networks from the list
for network in network_list:
if not network in new_network_list:
# Stop primary functionality
if this_router.router_state == 'primary':
d_network[network].stopDHCPServer()
d_network[network].removeGatewayAddress()
# Stop general functionality
d_network[network].removeFirewall()
d_network[network].removeNetwork()
# Delete the object
del(d_network[network])
# Update and print new list
network_list = new_network_list
logger.out('{}Network list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(network_list)), state='i')
# Update node objects' list
for node in d_node:
d_node[node].update_network_list(d_network)
# VM domain objects
@zk_conn.ChildrenWatch('/domains')
def update_domains(new_domain_list):
global domain_list, d_domain
# Add any missing domains to the list
for domain in new_domain_list:
if not domain in domain_list:
d_domain[domain] = DomainInstance.DomainInstance(domain, zk_conn, config, logger, this_node);
# Remove any deleted domains from the list
for domain in domain_list:
if not domain in new_domain_list:
# Delete the object
del(d_domain[domain])
# Update and print new list
domain_list = new_domain_list
logger.out('{}Domain list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(domain_list)), state='i')
# Update node objects' list
for node in d_node:
d_node[node].update_domain_list(d_domain)
###############################################################################
# PHASE 9 - Run the daemon
###############################################################################
# Set up our update function
update_zookeeper = this_node.update_zookeeper
# Start keepalive thread and immediately update Zookeeper
startKeepaliveTimer()
update_zookeeper()
# Tick loop
while True:
try:
time.sleep(1)
except:
break

View File

@ -0,0 +1,438 @@
#!/usr/bin/env python3
# DomainInstance.py - Class implementing a PVC virtual machine in pvcd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import uuid
import socket
import time
import threading
import libvirt
import kazoo.client
import pvcd.log as log
import pvcd.zkhandler as zkhandler
class DomainInstance:
# Initialization function
def __init__(self, domuuid, zk_conn, config, logger, this_node):
# Passed-in variables on creation
self.domuuid = domuuid
self.zk_conn = zk_conn
self.config = config
self.logger = logger
self.this_node = this_node
# These will all be set later
self.node = None
self.state = None
self.instart = False
self.inrestart = False
self.inmigrate = False
self.inreceive = False
self.inshutdown = False
self.instop = False
self.dom = self.lookupByUUID(self.domuuid)
# Watch for changes to the state field in Zookeeper
@self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
def watch_state(data, stat, event=""):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# If we get a delete state, just terminate outselves
if data == None:
return
# Otherwise perform a management command
else:
self.manage_vm_state()
# Get data functions
def getstate(self):
return self.state
def getnode(self):
return self.node
def getdom(self):
return self.dom
def getmemory(self):
try:
memory = int(self.dom.info()[2] / 1024)
except:
memory = 0
return memory
def getvcpus(self):
try:
vcpus = int(self.dom.info()[3])
except:
vcpus = 0
return vcpus
# Manage local node domain_list
def addDomainToList(self):
if not self.domuuid in self.this_node.domain_list:
try:
# Add the domain to the domain_list array
self.this_node.domain_list.append(self.domuuid)
# Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list) })
except Exception as e:
self.logger.out('Error adding domain to list: {}'.format(e), state='c')
def removeDomainFromList(self):
if self.domuuid in self.this_node.domain_list:
try:
# Remove the domain from the domain_list array
self.this_node.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list) })
except Exception as e:
self.logger.out('Error removing domain from list: {}'.format(e), state='c')
# Start up the VM
def start_vm(self):
self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.instart = False
return
# Try to get the current state in case it's already running
try:
self.dom = self.lookupByUUID(self.domuuid)
curstate = self.dom.state()[0]
except:
curstate = 'notstart'
if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
else:
# Or try to create it
try:
# Grab the domain information from Zookeeper
xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList()
self.logger.out('Successfully started VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = dom
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
except libvirt.libvirtError as e:
self.logger.out('Failed to create VM', state='e', prefix='Domain {}:'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'failed' })
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): str(e) })
self.dom = None
lv_conn.close()
self.instart = False
# Restart the VM
def restart_vm(self):
self.logger.out('Restarting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.inrestart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.inrestart = False
return
self.shutdown_vm()
self.start_vm()
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
lv_conn.close()
self.inrestart = False
# Stop the VM forcibly without updating state
def terminate_vm(self):
self.logger.out('Terminating VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
except AttributeError:
self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.removeDomainFromList()
self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the VM forcibly
def stop_vm(self):
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
except AttributeError:
self.logger.out('Failed to stop VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.removeDomainFromList()
if self.inrestart == False:
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = None
self.instop = False
# Shutdown the VM gracefully
def shutdown_vm(self):
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.inshutdown = True
self.dom.shutdown()
try:
tick = 0
while self.dom.state()[0] == libvirt.VIR_DOMAIN_RUNNING and tick < 60:
tick += 1
time.sleep(0.5)
if tick >= 60:
self.logger.out('Shutdown timeout expired', state='e', prefix='Domain {}:'.format(self.domuuid))
self.stop_vm()
self.inshutdown = False
return
except:
pass
self.removeDomainFromList()
if self.inrestart == False:
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = None
self.inshutdown = False
def live_migrate_vm(self, dest_node):
try:
dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.node))
if dest_lv_conn == None:
raise
except:
self.logger.out('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.node), state='e', prefix='Domain {}:'.format(self.domuuid))
return 1
try:
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0)
if target_dom == None:
raise
self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
except:
dest_lv_conn.close()
return 1
dest_lv_conn.close()
return 0
# Migrate the VM to a target host
def migrate_vm(self):
self.inmigrate = True
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}:'.format(self.domuuid))
try:
migrate_ret = self.live_migrate_vm(self.node)
except:
migrate_ret = 0
if migrate_ret != 0:
self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid))
self.shutdown_vm()
time.sleep(1)
else:
self.removeDomainFromList()
time.sleep(1)
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.inmigrate = False
# Receive the migration from another host (wait until VM is running)
def receive_migrate(self):
self.inreceive = True
self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid))
while True:
time.sleep(0.5)
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
if self.dom == None and self.state == 'migrate':
continue
if self.state != 'migrate':
break
try:
if self.dom.state()[0] == libvirt.VIR_DOMAIN_RUNNING:
break
except:
continue
try:
dom_state = self.dom.state()[0]
except AttributeError:
dom_state = None
if dom_state == libvirt.VIR_DOMAIN_RUNNING:
self.addDomainToList()
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
else:
self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.inreceive = False
#
# Main function to manage a VM (taking only self)
#
def manage_vm_state(self):
# Give ourselves a bit of leeway time
time.sleep(0.2)
# Get the current values from zookeeper (don't rely on the watch)
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid))
# Check the current state of the VM
try:
if self.dom != None:
running, reason = self.dom.state()
else:
raise
except:
running = libvirt.VIR_DOMAIN_NOSTATE
self.logger.out('VM state change for "{}": {} {}'.format(self.domuuid, self.state, self.node), state='i')
#######################
# Handle state changes
#######################
# Valid states are:
# start
# migrate
# restart
# shutdown
# stop
# Conditional pass one - Are we already performing an action
if self.instart == False \
and self.inrestart == False \
and self.inmigrate == False \
and self.inreceive == False \
and self.inshutdown == False \
and self.instop == False:
# Conditional pass two - Is this VM configured to run on this node
if self.node == self.this_node.name:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM is already running and should be
if self.state == "start":
self.addDomainToList()
# VM is already running and should be but stuck in migrate state
elif self.state == "migrate":
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.addDomainToList()
# VM should be restarted
elif self.state == "restart":
self.restart_vm()
# VM should be shut down
elif self.state == "shutdown":
self.shutdown_vm()
# VM should be stopped
elif self.state == "stop":
self.stop_vm()
else:
# VM should be started
if self.state == "start":
self.start_vm()
# VM should be migrated to this node
elif self.state == "migrate":
self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running)
if self.state == "restart":
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
# VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown":
self.removeDomainFromList()
# VM should be stoped; ensure it's gone from this node's domain_list
elif self.state == "stop":
self.removeDomainFromList()
else:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM should be migrated away from this node
if self.state == "migrate":
self.migrate_vm()
# VM should be terminated
else:
self.terminate_vm()
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
# 1. Takes a text UUID and handles converting it to bytes
# 2. Try's it and returns a sensible value if not
def lookupByUUID(self, tuuid):
lv_conn = None
dom = None
libvirt_name = "qemu:///system"
# Convert the text UUID to bytes
buuid = uuid.UUID(tuuid).bytes
# Try
try:
# Open a libvirt connection
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
return dom
# Lookup the UUID
dom = lv_conn.lookupByUUID(buuid)
# Fail
except:
pass
# After everything
finally:
# Close the libvirt connection
if lv_conn != None:
lv_conn.close()
# Return the dom object (or None)
return dom

View File

@ -0,0 +1,695 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvcd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import psutil
import socket
import time
import libvirt
import threading
import subprocess
import pvcd.log as log
import pvcd.zkhandler as zkhandler
import pvcd.common as common
class NodeInstance():
# Initialization function
def __init__(self, name, this_node, zk_conn, config, logger, d_node, d_network, d_domain):
# Passed-in variables on creation
self.name = name
self.this_node = this_node
self.zk_conn = zk_conn
self.config = config
self.logger = logger
# The IPMI hostname for fencing
self.ipmi_hostname = self.config['ipmi_hostname']
# Which node is primary
self.primary_node = None
# States
self.daemon_mode = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonmode'.format(self.name))
self.daemon_state = 'stop'
self.router_state = 'client'
self.domain_state = 'ready'
# Object lists
self.d_node = d_node
self.d_network = d_network
self.d_domain = d_domain
# Printable lists
self.active_node_list = []
self.flushed_node_list = []
self.inactive_node_list = []
self.network_list = []
self.domain_list = []
# Node resources
self.networks_count = 0
self.domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Flags
self.inflush = False
# Zookeeper handlers for changed states
@self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
def watch_node_daemonstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'stop'
if data != self.daemon_state:
self.daemon_state = data
@self.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name))
def watch_node_routerstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'client'
if self.name == self.this_node and self.daemon_mode == 'coordinator':
# We're a coordinator so we care about networking
if data != self.router_state:
self.router_state = data
if self.router_state == 'primary':
self.become_primary()
else:
self.become_secondary()
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
def watch_node_domainstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'unknown'
if data != self.domain_state:
self.domain_state = data
# toggle state management of this node
if self.name == self.this_node:
if self.domain_state == 'flush' and self.inflush == False:
# Do flushing in a thread so it doesn't block the migrates out
flush_thread = threading.Thread(target=self.flush, args=(), kwargs={})
flush_thread.start()
if self.domain_state == 'unflush' and self.inflush == False:
self.unflush()
@self.zk_conn.DataWatch('/primary_node')
def watch_primary_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'none'
if data != self.primary_node:
if self.daemon_mode == 'coordinator':
# We're a coordinator so we care about networking
if data == 'none':
# Toggle state management of routing functions
if self.name == self.this_node:
if self.daemon_state == 'run' and self.router_state != 'primary':
# Contend for primary
self.logger.out('Contending for primary routing state', state='i')
zkhandler.writedata(self.zk_conn, {'/primary_node': self.name })
elif data == self.this_node:
if self.name == self.this_node:
zkhandler.writedata(self.zk_conn, { '/nodes/{}/routerstate'.format(self.name): 'primary' })
self.primary_node = data
else:
if self.name == self.this_node:
zkhandler.writedata(self.zk_conn, { '/nodes/{}/routerstate'.format(self.name): 'secondary' })
self.primary_node = data
else:
self.primary_node = data
@self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
def watch_node_memfree(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name))
def watch_node_memused(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name))
def watch_node_memalloc(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name))
def watch_node_vcpualloc(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name))
def watch_node_runningdomains(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii').split()
except AttributeError:
data = []
if data != self.domain_list:
self.domain_list = data
@self.zk_conn.DataWatch('/nodes/{}/networkscount'.format(self.name))
def watch_node_networkscount(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.networks_count:
self.networks_count = data
@self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
def watch_node_domainscount(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.domains_count:
self.domains_count = data
# Update value functions
def update_node_list(self, d_node):
self.d_node = d_node
def update_network_list(self, d_network):
self.d_network = d_network
network_list = []
for network in self.d_network:
network_list.append(d_network[network].vni)
self.network_list = network_list
def update_domain_list(self, d_domain):
self.d_domain = d_domain
# Routing primary/secondary states
def become_secondary(self):
self.logger.out('Setting router {} to secondary state'.format(self.name), state='i')
self.logger.out('Network list: {}'.format(', '.join(self.network_list)))
time.sleep(0.5)
for network in self.d_network:
self.d_network[network].stopDHCPServer()
self.d_network[network].removeGatewayAddress()
def become_primary(self):
self.logger.out('Setting router {} to primary state.'.format(self.name), state='i')
self.logger.out('Network list: {}'.format(', '.join(self.network_list)))
for network in self.d_network:
self.d_network[network].createGatewayAddress()
self.d_network[network].startDHCPServer()
# Flush all VMs on the host
def flush(self):
self.inflush = True
self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i')
self.logger.out('Domain list: {}'.format(', '.join(self.domain_list)))
fixed_domain_list = self.domain_list.copy()
for dom_uuid in fixed_domain_list:
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
target_node = findTargetHypervisor(self.zk_conn, 'mem', dom_uuid)
if target_node == None:
self.logger.out('Failed to find migration target for VM "{}"; shutting down'.format(dom_uuid), state='e')
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(dom_uuid): 'shutdown' })
else:
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'migrate',
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): current_node
})
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
while True:
time.sleep(1)
vm_current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid))
if vm_current_state == "start":
break
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' })
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' })
self.inflush = False
def unflush(self):
self.inflush = True
self.logger.out('Restoring node {} to active service.'.format(self.name), state='i')
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' })
fixed_domain_list = self.d_domain.copy()
for dom_uuid in fixed_domain_list:
try:
last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
except:
continue
if last_node != self.name:
continue
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'migrate',
'/domains/{}/node'.format(dom_uuid): self.name,
'/domains/{}/lastnode'.format(dom_uuid): ''
})
self.inflush = False
def update_zookeeper(self):
# Connect to libvirt
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return
# Get past state and update if needed
past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name))
if past_state != 'run':
self.daemon_state = 'run'
zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(self.name): 'run' })
else:
self.daemon_state = 'run'
# Ensure the primary key is properly set
if self.name == self.this_node:
if self.router_state == 'primary':
if zkhandler.readdata(self.zk_conn, '/primary_node') != self.name:
zkhandler.writedata(self.zk_conn, {'/primary_node': self.name})
# Toggle state management of dead VMs to restart them
memalloc = 0
vcpualloc = 0
for domain, instance in self.d_domain.items():
if domain in self.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.getnode() == self.name:
if instance.getdom() != None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
raise
except Exception as e:
# Toggle a state "change"
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() })
# Ensure that any running VMs are readded to the domain_list
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
for domain in running_domains:
domain_uuid = domain.UUIDString()
if domain_uuid not in self.domain_list:
self.domain_list.append(domain_uuid)
# Set our information in zookeeper
#self.name = lv_conn.getHostname()
self.memused = int(psutil.virtual_memory().used / 1024 / 1024)
self.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
self.memalloc = memalloc
self.vcpualloc = vcpualloc
self.cpuload = os.getloadavg()[0]
self.domains_count = len(lv_conn.listDomainsID())
keepalive_time = int(time.time())
try:
zkhandler.writedata(self.zk_conn, {
'/nodes/{}/memused'.format(self.name): str(self.memused),
'/nodes/{}/memfree'.format(self.name): str(self.memfree),
'/nodes/{}/memalloc'.format(self.name): str(self.memalloc),
'/nodes/{}/vcpualloc'.format(self.name): str(self.vcpualloc),
'/nodes/{}/cpuload'.format(self.name): str(self.cpuload),
'/nodes/{}/networkscount'.format(self.name): str(self.networks_count),
'/nodes/{}/domainscount'.format(self.name): str(self.domains_count),
'/nodes/{}/runningdomains'.format(self.name): ' '.join(self.domain_list),
'/nodes/{}/keepalive'.format(self.name): str(keepalive_time)
})
except:
self.logger.out('Failed to set keepalive data', state='e')
return
# Close the Libvirt connection
lv_conn.close()
# Display node information to the terminal
self.logger.out('{}{} keepalive{}'.format(self.logger.fmt_purple, self.name, self.logger.fmt_end), state='t')
self.logger.out(
'{bold}Domains:{nobold} {domcount} '
'{bold}Networks:{nobold} {netcount} '
'{bold}Allocated memory [MiB]:{nobold} {allocmem} '
'{bold}Free memory [MiB]:{nobold} {freemem} '
'{bold}Used memory [MiB]:{nobold} {usedmem} '
'{bold}Load:{nobold} {load}'.format(
bold=self.logger.fmt_bold,
nobold=self.logger.fmt_end,
domcount=self.domains_count,
freemem=self.memfree,
usedmem=self.memused,
load=self.cpuload,
allocmem=self.memalloc,
netcount=self.networks_count
),
)
# Update our local node lists
for node_name in self.d_node:
try:
node_daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name))
node_domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/keepalive'.format(node_name)))
except:
node_daemon_state = 'unknown'
node_domain_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - ( int(self.config['keepalive_interval']) * int(self.config['fence_intervals']) )
if node_keepalive < node_deadtime and node_daemon_state == 'run':
self.logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })
fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn, self.config, self.logger), kwargs={})
fence_thread.start()
# Update the arrays
if node_daemon_state == 'run' and node_domain_state != 'flushed' and node_name not in self.active_node_list:
self.active_node_list.append(node_name)
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_daemon_state != 'run' and node_domain_state != 'flushed' and node_name not in self.inactive_node_list:
self.inactive_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
if node_domain_state == 'flushed' and node_name not in self.flushed_node_list:
self.flushed_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
# List of the non-primary coordinators
secondary_node_list = self.config['coordinators'].split(',')
if secondary_node_list:
secondary_node_list.remove(self.primary_node)
for node in secondary_node_list:
if node in self.inactive_node_list:
secondary_node_list.remove(node)
# Display cluster information to the terminal
self.logger.out('{}Cluster status{}'.format(self.logger.fmt_purple, self.logger.fmt_end), state='t')
self.logger.out('{}Primary coordinator:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, self.primary_node))
self.logger.out('{}Secondary coordinators:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(secondary_node_list)))
self.logger.out('{}Active hypervisors:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.active_node_list)))
self.logger.out('{}Flushed hypervisors:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.flushed_node_list)))
self.logger.out('{}Inactive nodes:{} {}'.format(self.logger.fmt_bold, self.logger.fmt_end, ' '.join(self.inactive_node_list)))
#
# Find a migration target
#
def findTargetHypervisor(zk_conn, search_field, dom_uuid):
if search_field == 'mem':
return findTargetHypervisorMem(zk_conn, dom_uuid)
if search_field == 'load':
return findTargetHypervisorLoad(zk_conn, dom_uuid)
if search_field == 'vcpus':
return findTargetHypervisorVCPUs(zk_conn, dom_uuid)
if search_field == 'vms':
return findTargetHypervisorVMs(zk_conn, dom_uuid)
return None
# Get the list of valid target nodes
def getHypervisors(zk_conn, dom_uuid):
valid_node_list = []
full_node_list = zkhandler.listchildren(zk_conn, '/nodes')
current_node = zkhandler.readdata(zk_conn, '/domains/{}/node'.format(dom_uuid))
for node in full_node_list:
daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node))
domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node))
if node == current_node:
continue
if daemon_state != 'run' or domain_state != 'ready':
continue
valid_node_list.append(node)
return valid_node_list
# via free memory (relative to allocated memory)
def findTargetHypervisorMem(zk_conn, dom_uuid):
most_allocfree = 0
target_node = None
node_list = getHypervisors(zk_conn, dom_uuid)
for node in node_list:
memalloc = int(zkhandler.readdata(zk_conn, '/nodes/{}/memalloc'.format(node)))
memused = int(zkhandler.readdata(zk_conn, '/nodes/{}/memused'.format(node)))
memfree = int(zkhandler.readdata(zk_conn, '/nodes/{}/memfree'.format(node)))
memtotal = memused + memfree
allocfree = memtotal - memalloc
if allocfree > most_allocfree:
most_allocfree = allocfree
target_node = node
return target_node
# via load average
def findTargetHypervisorLoad(zk_conn, dom_uuid):
least_load = 9999
target_node = None
node_list = getHypervisors(zk_conn, dom_uuid)
for node in node_list:
load = int(zkhandler.readdata(zk_conn, '/nodes/{}/load'.format(node)))
if load < least_load:
least_load = load
target_hypevisor = node
return target_node
# via total vCPUs
def findTargetHypervisorVCPUs(zk_conn, dom_uuid):
least_vcpus = 9999
target_node = None
node_list = getHypervisors(zk_conn, dom_uuid)
for node in node_list:
vcpus = int(zkhandler.readdata(zk_conn, '/nodes/{}/vcpualloc'.format(node)))
if vcpus < least_vcpus:
least_vcpus = vcpus
target_node = node
return target_node
# via total VMs
def findTargetHypervisorVMs(zk_conn, dom_uuid):
least_vms = 9999
target_node = None
node_list = getHypervisors(zk_conn, dom_uuid)
for node in node_list:
vms = int(zkhandler.readdata(zk_conn, '/nodes/{}/domainscount'.format(node)))
if vms < least_vms:
least_vms = vms
target_node = node
return target_node
#
# Fence thread entry function
#
def fenceNode(node_name, zk_conn, config, logger):
failcount = 0
# We allow exactly 3 saving throws for the host to come back online
while failcount < 3:
# Wait 5 seconds
time.sleep(5)
# Get the state
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
# Is it still 'dead'
if node_daemon_state == 'dead':
failcount += 1
logger.out('Node "{}" failed {} saving throws'.format(node_name, failcount), state='w')
# It changed back to something else so it must be alive
else:
logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o')
return
logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='e')
# Get IPMI information
ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name))
ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name))
ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name))
# Shoot it in the head
fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger)
# Hold to ensure the fence takes effect
time.sleep(3)
# Force into secondary network state if needed
if node_name in config['coordinators'].split(','):
zkhandler.writedata(zk_conn, { '/nodes/{}/routerstate'.format(node_name): 'secondary' })
if zkhandler.readdata(zk_conn, '/primary_node') == node_name:
zkhandler.writedata(zk_conn, { '/primary_node': 'none' })
# If the fence succeeded and successful_fence is migrate
if fence_status == True and config['successful_fence'] == 'migrate':
migrateFromFencedNode(zk_conn, node_name, logger)
# If the fence failed and failed_fence is migrate
if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0':
migrateFromFencedNode(zk_conn, node_name, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zk_conn, node_name, logger):
logger.out('Moving VMs from dead node "{}" to new hosts'.format(node_name), state='i')
dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split()
for dom_uuid in dead_node_running_domains:
target_node = findTargetHypervisor(zk_conn, 'mem', dom_uuid)
logger.out('Moving VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
zkhandler.writedata(zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'start',
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): node_name
})
# Set node in flushed state for easy remigrating when it comes back
zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' })
#
# Perform an IPMI fence
#
def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
ipmi_command = ['/usr/bin/ipmitool', '-I', 'lanplus', '-H', ipmi_hostname, '-U', ipmi_user, '-P', ipmi_password, 'chassis', 'power', 'reset']
ipmi_command_output = subprocess.run(ipmi_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if ipmi_command_output.returncode == 0:
logger.out('Successfully rebooted dead node', state='o')
return True
else:
logger.out('Failed to reboot dead node', state='e')
return False

View File

@ -0,0 +1,427 @@
#!/usr/bin/env python3
# VXNetworkInstance.py - Class implementing a PVC VM network and run by pvcd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
from textwrap import dedent
import pvcd.log as log
import pvcd.zkhandler as zkhandler
import pvcd.common as common
class VXNetworkInstance():
# Initialization function
def __init__ (self, vni, zk_conn, config, logger, this_node):
self.vni = vni
self.zk_conn = zk_conn
self.config = config
self.logger = logger
self.this_node = this_node
self.vni_dev = config['vni_dev']
self.old_description = None
self.description = None
self.domain = None
self.ip_gateway = zkhandler.readdata(self.zk_conn, '/networks/{}/ip_gateway'.format(self.vni))
self.ip_network = None
self.ip_cidrnetmask = None
self.dhcp_flag = zkhandler.readdata(self.zk_conn, '/networks/{}/dhcp_flag'.format(self.vni))
self.dhcp_start = None
self.dhcp_end = None
self.vxlan_nic = 'vxlan{}'.format(self.vni)
self.bridge_nic = 'br{}'.format(self.vni)
self.nftables_update_filename = '{}/update'.format(config['nft_dynamic_directory'])
self.nftables_netconf_filename = '{}/networks/{}.nft'.format(config['nft_dynamic_directory'], self.vni)
self.firewall_rules = []
self.dhcp_server_daemon = None
self.dnsmasq_hostsdir = '{}/{}'.format(config['dnsmasq_dynamic_directory'], self.vni)
self.dhcp_reservations = []
# Zookeper handlers for changed states
@self.zk_conn.DataWatch('/networks/{}'.format(self.vni))
def watch_network_description(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.description != data.decode('ascii'):
self.old_description = self.description
self.description = data.decode('ascii')
@self.zk_conn.DataWatch('/networks/{}/domain'.format(self.vni))
def watch_network_domain(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.domain != data.decode('ascii'):
domain = data.decode('ascii')
self.domain = domain
@self.zk_conn.DataWatch('/networks/{}/ip_network'.format(self.vni))
def watch_network_ip_network(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip_network != data.decode('ascii'):
ip_network = data.decode('ascii')
self.ip_network = ip_network
self.ip_cidrnetmask = ip_network.split('/')[-1]
@self.zk_conn.DataWatch('/networks/{}/ip_gateway'.format(self.vni))
def watch_network_gateway(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip_gateway != data.decode('ascii'):
orig_gateway = self.ip_gateway
self.ip_gateway = data.decode('ascii')
if self.this_node.router_state == 'primary':
if orig_gateway:
self.removeGatewayAddress()
self.createGatewayAddress()
@self.zk_conn.DataWatch('/networks/{}/dhcp_flag'.format(self.vni))
def watch_network_dhcp_status(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp_flag != data.decode('ascii'):
self.dhcp_flag = ( data.decode('ascii') == 'True' )
if self.dhcp_flag and self.this_node.router_state == 'primary':
self.startDHCPServer()
elif self.this_node.router_state == 'primary':
self.stopDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/dhcp_start'.format(self.vni))
def watch_network_dhcp_start(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp_start != data.decode('ascii'):
self.dhcp_start = data.decode('ascii')
@self.zk_conn.DataWatch('/networks/{}/dhcp_end'.format(self.vni))
def watch_network_dhcp_end(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp_end != data.decode('ascii'):
self.dhcp_end = data.decode('ascii')
@self.zk_conn.ChildrenWatch('/networks/{}/dhcp_reservations'.format(self.vni))
def watch_network_dhcp_reservations(new_reservations, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if self.dhcp_reservations != new_reservations:
old_reservations = self.dhcp_reservations
self.dhcp_reservations = new_reservations
self.updateDHCPReservations(old_reservations, new_reservations)
@self.zk_conn.ChildrenWatch('/networks/{}/firewall_rules'.format(self.vni))
def watch_network_firewall_rules(new_rules, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if self.firewall_rules != new_rules:
old_rules = self.firewall_rules
self.firewall_rules = new_rules
self.updateFirewallRules(old_rules, new_rules)
self.createNetwork()
self.createFirewall()
def getvni(self):
return self.vni
def updateDHCPReservations(self, old_reservations_list, new_reservations_list):
for reservation in new_reservations_list:
if reservation not in old_reservations_list:
# Add new reservation file
filename = '{}/{}'.format(self.dnsmasq_hostsdir, reservation)
ipaddr = zkhandler.readdata(
self.zk_conn,
'/networks/{}/dhcp_reservations/{}/ipaddr'.format(
self.vni,
reservation
)
)
entry = '{},{}'.format(reservation, ipaddr)
outfile = open(filename, 'w')
outfile.write(entry)
outfile.close()
for reservation in old_reservations_list:
if reservation not in new_reservations_list:
# Remove old reservation file
filename = '{}/{}'.format(self.dnsmasq_hostsdir, reservation)
try:
os.remove(filename)
self.dhcp_server_daemon.signal('hup')
except:
pass
def updateFirewallRules(self, old_rules_list, new_rules_list):
for rule in new_rules_list:
if rule not in old_rules_list:
# Add new rule entry
pass
for rule in old_rules_list:
if rule not in new_rules_list:
pass
def createNetwork(self):
self.logger.out(
'Creating VXLAN device on interface {}'.format(
self.vni_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip link add {} type vxlan id {} dstport 4789 dev {}'.format(
self.vxlan_nic,
self.vni,
self.vni_dev
)
)
common.run_os_command(
'brctl addbr {}'.format(
self.bridge_nic
)
)
common.run_os_command(
'brctl addif {} {}'.format(
self.bridge_nic,
self.vxlan_nic
)
)
common.run_os_command(
'ip link set {} up'.format(
self.vxlan_nic
)
)
common.run_os_command(
'ip link set {} up'.format(
self.bridge_nic
)
)
def createFirewall(self):
nftables_network_rules = """# Rules for network {vxlannic}
add chain inet filter {vxlannic}-in
add chain inet filter {vxlannic}-out
add rule inet filter {vxlannic}-in counter
add rule inet filter {vxlannic}-out counter
# Jump from forward chain to this chain when matching net
add rule inet filter forward ip daddr {netaddr} counter jump {vxlannic}-in
add rule inet filter forward ip saddr {netaddr} counter jump {vxlannic}-out
# Allow ICMP traffic into the router from network
add rule inet filter input ip protocol icmp meta iifname {bridgenic} counter accept
# Allow DNS and DHCP traffic into the router from network
add rule inet filter input tcp dport 53 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 53 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 67 meta iifname {bridgenic} counter accept
# Block traffic into the router from network
add rule inet filter input meta iifname {bridgenic} counter drop
""".format(
netaddr=self.ip_network,
vxlannic=self.vxlan_nic,
bridgenic=self.bridge_nic
)
print(nftables_network_rules)
with open(self.nftables_netconf_filename, 'w') as nfbasefile:
nfbasefile.write(dedent(nftables_network_rules))
open(self.nftables_update_filename, 'a').close()
pass
def createGatewayAddress(self):
if self.this_node.router_state == 'primary':
self.logger.out(
'Creating gateway {} on interface {}'.format(
self.ip_gateway,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
print('ip address add {}/{} dev {}'.format(
self.ip_gateway,
self.ip_cidrnetmask,
self.bridge_nic
))
common.run_os_command(
'ip address add {}/{} dev {}'.format(
self.ip_gateway,
self.ip_cidrnetmask,
self.bridge_nic
)
)
common.run_os_command(
'arping -A -c2 -I {} {}'.format(
self.bridge_nic,
self.ip_gateway
),
background=True
)
def startDHCPServer(self):
if self.this_node.router_state == 'primary':
self.logger.out(
'Starting dnsmasq DHCP server on interface {}'.format(
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Create the network hostsdir
common.run_os_command(
'/bin/mkdir --parents {}'.format(
self.dnsmasq_hostsdir
)
)
# Recreate the environment we need for dnsmasq
pvcd_config_file = os.environ['PVCD_CONFIG_FILE']
dhcp_environment = {
'DNSMASQ_INTERFACE': self.bridge_nic,
'PVCD_CONFIG_FILE': pvcd_config_file
}
# Define the dnsmasq config
dhcp_configuration = [
'--domain-needed',
'--bogus-priv',
'--no-resolv',
'--filterwin2k',
'--expand-hosts',
'--domain={}'.format(self.domain),
'--local=/{}/'.format(self.domain),
'--auth-zone={}'.format(self.domain),
# '--auth-peer=127.0.0.1,{}'.format(self.ip_gateway),
'--auth-sec-servers=127.0.0.1,[::1],{}'.format(self.ip_gateway),
'--listen-address={}'.format(self.ip_gateway),
'--bind-interfaces',
'--leasefile-ro',
'--dhcp-script=/usr/share/pvc/pvcd/dnsmasq-zookeeper-leases.py',
'--dhcp-range={},{},4h'.format(self.dhcp_start, self.dhcp_end),
'--dhcp-lease-max=99',
'--dhcp-hostsdir={}'.format(self.dnsmasq_hostsdir),
'--log-queries=extra',
'--log-facility=DAEMON',
'--keep-in-foreground'
]
# Start the dnsmasq process in a thread
self.dhcp_server_daemon = common.run_os_daemon(
'/usr/sbin/dnsmasq {}'.format(
' '.join(dhcp_configuration)
),
environment=dhcp_environment
)
def removeNetwork(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.vni_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip link set {} down'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link set {} down'.format(
self.vxlan_nic
)
)
common.run_os_command(
'brctl delif {} {}'.format(
self.bridge_nic,
self.vxlan_nic
)
)
common.run_os_command(
'brctl delbr {}'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link delete {}'.format(
self.vxlan_nic
)
)
def removeFirewall(self):
os.remove(self.nftables_netconf_filename)
open(self.nftables_update_filename, 'a').close()
pass
def removeGatewayAddress(self):
self.logger.out(
'Removing gateway {} from interface {}'.format(
self.ip_gateway,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip address delete {}/{} dev {}'.format(
self.ip_gateway,
self.ip_cidrnetmask,
self.bridge_nic
)
)
def stopDHCPServer(self):
if self.dhcp_server_daemon:
self.logger.out(
'Stopping dnsmasq DHCP server on interface {}'.format(
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
self.dhcp_server_daemon.signal('term')

View File

View File

@ -0,0 +1,83 @@
#!/usr/bin/env python3
# common.py - PVC daemon function library, common fuctions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import subprocess
import threading
import signal
import os
import time
import pvcd.log as log
class OSDaemon(object):
def __init__(self, command, environment):
self.proc = subprocess.Popen(
command,
env=environment,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def signal(self, sent_signal):
signal_map = {
'hup': signal.SIGHUP,
'int': signal.SIGINT,
'term': signal.SIGTERM
}
self.proc.send_signal(signal_map[sent_signal])
def run_os_daemon(command_string, environment=None):
command = command_string.split()
print(' '.join(command))
daemon = OSDaemon(command, environment)
return daemon
# Run a oneshot command, optionally without blocking
def run_os_command(command_string, background=False, environment=None):
command = command_string.split()
if background:
def runcmd():
subprocess.run(
command,
env=environment,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
thread = threading.Thread(target=runcmd, args=())
thread.start()
return 0, None, None
else:
command_output = subprocess.run(
command,
env=environment,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return command_output.returncode, command_output.stdout.decode('ascii'), command_output.stderr.decode('ascii')
# Reload the firewall rules of the system
def reload_firewall_rules(rules_dir):
log.echo('Updating firewall rules', '', 'o')
rules_file = '{}/base.nft'.format(rules_dir)
retcode, stdout, stderr = run_os_command('/usr/sbin/nft -f {}'.format(rules_file))
if retcode != 0:
log.echo('Failed to reload rules: {}'.format(stderr), '', 'e')

View File

@ -0,0 +1,139 @@
#!/usr/bin/python3
import argparse
import configparser
import os, sys
import kazoo.client
import re
#
# Variables
#
#
# General Functions
#
def get_zookeeper_key():
# Get the interface from environment (passed by dnsmasq)
try:
interface = os.environ['DNSMASQ_INTERFACE']
except:
exit(1)
# Get the ID of the interface (the digits)
network_vni = re.findall('\d+', interface)[0]
# Create the key
zookeeper_key = '/networks/{}/dhcp_leases'.format(network_vni)
return zookeeper_key
def get_lease_expiry():
try:
expiry = os.environ['DNSMASQ_LEASE_EXPIRES']
except:
expiry = '0'
return expiry
def get_client_id():
try:
client_id = os.environ['DNSMASQ_CLIENT_ID']
except:
client_id = '*'
return client_id
def connect_zookeeper():
# We expect the environ to contain the config file
try:
pvcd_config_file = os.environ['PVCD_CONFIG_FILE']
except:
# Default place
pvcd_config_file = '/etc/pvc/pvcd.conf'
o_config = configparser.ConfigParser()
o_config.read(pvcd_config_file)
try:
zk_host = o_config['default']['coordinators']
except:
try:
zk_host = o_config[socket.gethostname()]['coordinators']
except:
exit(1)
zk_conn = kazoo.client.KazooClient(hosts=zk_host)
try:
zk_conn.start()
except:
exit(1)
return zk_conn
def read_data(zk_conn, key):
return zk_conn.get(key)[0].decode('ascii')
def get_lease(zk_conn, zk_leases_key, macaddr):
expiry = read_data(zk_conn, '{}/{}/expiry'.format(zk_leases_key, macaddr))
ipaddr = read_data(zk_conn, '{}/{}/ipaddr'.format(zk_leases_key, macaddr))
hostname = read_data(zk_conn, '{}/{}/hostname'.format(zk_leases_key, macaddr))
clientid = read_data(zk_conn, '{}/{}/clientid'.format(zk_leases_key, macaddr))
return expiry, ipaddr, hostname, clientid
#
# Command Functions
#
def read_lease_database(zk_conn, zk_leases_key):
leases_list = zk_conn.get_children(zk_leases_key)
output_list = []
for macaddr in leases_list:
expiry, ipaddr, hostname, clientid = get_lease(zk_conn, zk_leases_key, macaddr)
data_string = '{} {} {} {} {}'.format(expiry, macaddr, ipaddr, hostname, clientid)
print('Reading lease from Zookeeper: {}'.format(data_string), file=sys.stderr)
output_list.append('{}'.format(data_string))
# Output list
print('\n'.join(output_list))
def add_lease(zk_conn, zk_leases_key, expiry, macaddr, ipaddr, hostname, clientid):
transaction = zk_conn.transaction()
transaction.create('{}/{}'.format(zk_leases_key, macaddr), ''.encode('ascii'))
transaction.create('{}/{}/expiry'.format(zk_leases_key, macaddr), expiry.encode('ascii'))
transaction.create('{}/{}/ipaddr'.format(zk_leases_key, macaddr), ipaddr.encode('ascii'))
transaction.create('{}/{}/hostname'.format(zk_leases_key, macaddr), hostname.encode('ascii'))
transaction.create('{}/{}/clientid'.format(zk_leases_key, macaddr), clientid.encode('ascii'))
transaction.commit()
def del_lease(zk_conn, zk_leases_key, macaddr, expiry):
zk_conn.delete('{}/{}'.format(zk_leases_key, macaddr), recursive=True)
#
# Instantiate the parser
#
parser = argparse.ArgumentParser(description='Store or retrieve dnsmasq leases in Zookeeper')
parser.add_argument('action', type=str, help='Action')
parser.add_argument('macaddr', type=str, help='MAC Address', nargs='?', default=None)
parser.add_argument('ipaddr', type=str, help='IP Address', nargs='?', default=None)
parser.add_argument('hostname', type=str, help='Hostname', nargs='?', default=None)
args = parser.parse_args()
action = args.action
macaddr = args.macaddr
ipaddr = args.ipaddr
hostname = args.hostname
zk_conn = connect_zookeeper()
zk_leases_key = get_zookeeper_key()
if action == 'init':
read_lease_database(zk_conn, zk_leases_key)
exit(0)
expiry = get_lease_expiry()
clientid = get_client_id()
#
# Choose action
#
print('Lease action - {} {} {} {}'.format(action, macaddr, ipaddr, hostname), file=sys.stderr)
if action == 'add':
add_lease(zk_conn, zk_leases_key, expiry, macaddr, ipaddr, hostname, clientid)
elif action == 'del':
del_lease(zk_conn, zk_leases_key, macaddr, expiry)
elif action == 'old':
pass

100
node-daemon/pvcd/log.py Normal file
View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
# log.py - Output (stdout + logfile) functions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import datetime
class Logger(object):
# Define a logger class for a daemon instance
# Keeps record of where to log, and is passed messages which are
# formatted in various ways based off secondary characteristics.
# ANSII colours for output
fmt_red = '\033[91m'
fmt_blue = '\033[94m'
fmt_cyan = '\033[96m'
fmt_green = '\033[92m'
fmt_yellow = '\033[93m'
fmt_purple = '\033[95m'
fmt_bold = '\033[1m'
fmt_end = '\033[0m'
# Initialization of instance
def __init__(self, config):
self.config = config
if self.config['file_logging'] == 'True':
self.logfile = self.config['log_directory'] + '/pvc.log'
# We open the logfile for the duration of our session, but have a hup function
self.writer = open(self.logfile, 'a', buffering=1)
self.last_colour = self.fmt_cyan
# Provide a hup function to close and reopen the writer
def hup(self):
self.writer.close()
self.writer = open(self.logfile, 'a', buffering=0)
# Output function
def out(self, message, state='', prefix=''):
# Get the date
date = '{} - '.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'))
endc = Logger.fmt_end
# Determine the formatting
# OK
if state == 'o':
colour = Logger.fmt_green
prompt = '>>> '
# Error
elif state == 'e':
colour = Logger.fmt_red
prompt = '>>> '
# Warning
elif state == 'w':
colour = Logger.fmt_yellow
prompt = '>>> '
# Tick
elif state == 't':
colour = Logger.fmt_purple
prompt = '>>> '
# Information
elif state == 'i':
colour = Logger.fmt_blue
prompt = '>>> '
# Startup
elif state == 's':
colour = Logger.fmt_cyan
prompt = '>>> '
# Continuation
else:
date = ''
colour = self.last_colour
prompt = '>>> '
# Append space to prefix
if prefix != '':
prefix = prefix + ' - '
message = colour + prompt + endc + date + prefix + message
print(message)
if self.config['file_logging'] == 'True':
self.writer.write(message + '\n')
self.last_colour = colour

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
# zkhandler.py - Secure versioned ZooKeeper updates
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import kazoo.client
import pvcd.log as log
# Child list function
def listchildren(zk_conn, key):
children = zk_conn.get_children(key)
return children
# Key deletion function
def delete(zk_conn, key):
zk_conn.delete(key, recursive=True)
# Data read function
def readdata(zk_conn, key):
data_raw = zk_conn.get(key)
data = data_raw[0].decode('ascii')
meta = data_raw[1]
return data
# Data write function
def writedata(zk_conn, kv):
# Start up a transaction
zk_transaction = zk_conn.transaction()
# Proceed one KV pair at a time
for key in sorted(kv):
data = kv[key]
if not data:
data = ''
# Check if this key already exists or not
if not zk_conn.exists(key):
# We're creating a new key
zk_transaction.create(key, data.encode('ascii'))
else:
# We're updating a key with version validation
orig_data = zk_conn.get(key)
version = orig_data[1].version
# Set what we expect the new version to be
new_version = version + 1
# Update the data
zk_transaction.set_data(key, data.encode('ascii'))
# Set up the check
try:
zk_transaction.check(key, new_version)
except TypeError:
print('Zookeeper key "{}" does not match expected version'.format(key))
return False
# Commit the transaction
try:
zk_transaction.commit()
return True
except Exception:
return False