Move virt daemon location

This commit is contained in:
2018-09-10 14:21:27 -04:00
parent d9c38aa3fb
commit 32e3da1acc
10 changed files with 4 additions and 4 deletions

View File

@ -0,0 +1,43 @@
# pvcd cluster configuration file example
#
# This configuration file specifies details for this node in PVC. Multiple host
# blocks can be added but only the one matching the current system hostname will
# be used by the local daemon. Default values apply to all hosts for any value
# not specifically overridden.
#
# The following values are required for each host or in a default section:
# zookeeper: the IP+port of the Zookeper instance (defaults to 127.0.0.1:2181)
# keepalive_interval: the interval between keepalives and for dead node timeout (defaults to 5)
# fence_intervals: the number of keepalive_intervals without Zookeeper contact before this node
# will consider another node dead and fence it (defaults to 6, i.e. 30s)
# suicide_intervals: the number of keepalive_intervals without Zookeeper contact before this
# node will consider itself failed and terminate all running VMs (defaults
# to 0, i.e. disabled); should be less than "fence_intervals"
# successful_fence: the action to take on a successful fencing operation; can be "none" or
# "migrate" (defaults to "migrate")
# failed_fence: the action to take on a failed fencing operation; can be "none" or "migrate"
# (defaults to "none"); "migrate" requires "suicide_intervals" to be set)
# NOTE: POTENTIALLY DANGEROUS - see README for details
# migration_target_selector: the method to use to select target hypervisor nodes during a
# flush action; can be "mem", "load", "vcpus", or "vms" (defaults
# to "mem"); the best choice based on this field is selected for
# each VM to be migrated
# ipmi_hostname: the IPMI hostname for fencing (defaults to <shortname>-lom.<domain>)
# ipmi_username: username to connect to IPMI
# ipmi_password: password to connect to IPMI
#
# Copy this example to /etc/pvc/pvcd.conf and edit to your needs
[default]
zookeeper = 127.0.0.1:2181
keepalive_interval = 5
fence_intervals = 6
suicide_intervals = 0
successful_fence = migrate
failed_fence = none
migration_target_selector = mem
[myhost]
ipmi_username = admin
ipmi_password = admin
ipmi_hostname = myhost-lom

23
virtualization-daemon/pvcd.py Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# pvcd.py - Daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcd.Daemon

View File

@ -0,0 +1,15 @@
# Parallel Virtual Cluster node daemon unit file
[Unit]
Description = Parallel Virtual Cluster node daemon
After = network-online.target libvirtd.service zookeeper.service
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVCD_CONFIG_FILE=/etc/pvc/pvcd.conf
ExecStart = /usr/share/pvc/pvcd.py
Restart = on-failure
[Install]
WantedBy = multi-user.target

View File

@ -0,0 +1,271 @@
#!/usr/bin/env python3
# Daemon.py - PVC hypervisor node daemon
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import kazoo.client
import libvirt
import sys
import os
import signal
import socket
import psutil
import subprocess
import uuid
import time
import configparser
import apscheduler.schedulers.background
import pvcd.ansiiprint as ansiiprint
import pvcd.zkhandler as zkhandler
import pvcd.VMInstance as VMInstance
import pvcd.NodeInstance as NodeInstance
print(ansiiprint.bold() + "pvcd - Parallel Virtual Cluster management daemon" + ansiiprint.end())
# Get the config file variable from the environment
try:
pvcd_config_file = os.environ['PVCD_CONFIG_FILE']
except:
print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcd.')
exit(1)
myhostname = socket.gethostname()
myshorthostname = myhostname.split('.', 1)[0]
mydomainname = ''.join(myhostname.split('.', 1)[1:])
# Config values dictionary
config_values = [
'zookeeper',
'keepalive_interval',
'fence_intervals',
'suicide_intervals',
'successful_fence',
'failed_fence',
'migration_target_selector',
'ipmi_hostname',
'ipmi_username',
'ipmi_password'
]
def readConfig(pvcd_config_file, myhostname):
print('Loading configuration from file {}'.format(pvcd_config_file))
o_config = configparser.ConfigParser()
o_config.read(pvcd_config_file)
config = {}
try:
entries = o_config[myhostname]
except:
try:
entries = o_config['default']
except Exception as e:
print('ERROR: Config file is not valid!')
exit(1)
for entry in config_values:
try:
config[entry] = entries[entry]
except:
try:
config[entry] = o_config['default'][entry]
except:
print('ERROR: Config file missing required value "{}" for this host!'.format(entry))
exit(1)
# Handle an empty ipmi_hostname
if config['ipmi_hostname'] == '':
config['ipmi_hostname'] = myshorthostname + '-lom.' + mydomainname
return config
# Get config
config = readConfig(pvcd_config_file, myhostname)
# Check that libvirtd is listening TCP
libvirt_check_name = "qemu+tcp://127.0.0.1:16509/system"
try:
print('Connecting to Libvirt instance at {}'.format(libvirt_check_name))
lv_conn = libvirt.open(libvirt_check_name)
if lv_conn == None:
raise
except:
print('ERROR: Failed to open local libvirt connection via TCP; required for PVC!')
exit(1)
lv_conn.close()
# Connect to local zookeeper
zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper'])
try:
print('Connecting to Zookeeper instance at {}'.format(config['zookeeper']))
zk_conn.start()
except:
print('ERROR: Failed to connect to Zookeeper')
exit(1)
# Handle zookeeper failures
def zk_listener(state):
global zk_conn, update_timer
if state == kazoo.client.KazooState.SUSPENDED:
ansiiprint.echo('Connection to Zookeeper lost; retrying', '', 'e')
# Stop keepalive thread
stopKeepaliveTimer(update_timer)
while True:
_zk_conn = kazoo.client.KazooClient(hosts=config['zookeeper'])
try:
_zk_conn.start()
zk_conn = _zk_conn
break
except:
time.sleep(1)
elif state == kazoo.client.KazooState.CONNECTED:
ansiiprint.echo('Connection to Zookeeper started', '', 'o')
# Start keepalive thread
update_timer = createKeepaliveTimer()
else:
pass
zk_conn.add_listener(zk_listener)
# Cleanup function
def cleanup(signum, frame):
ansiiprint.echo('Terminating daemon', '', 'e')
# Set stop state in Zookeeper
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' })
# Close the Zookeeper connection
try:
zk_conn.stop()
zk_conn.close()
except:
pass
# Stop keepalive thread
stopKeepaliveTimer(update_timer)
# Exit
sys.exit(0)
# Handle signals gracefully
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGQUIT, cleanup)
# Gather useful data about our host for staticdata
# Static data format: 'cpu_count', 'arch', 'os', 'kernel'
staticdata = []
staticdata.append(str(psutil.cpu_count()))
staticdata.append(subprocess.run(['uname', '-r'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
staticdata.append(subprocess.run(['uname', '-o'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
# Print static data on start
print('{0}Node hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), myhostname))
print('{0}IPMI hostname:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), config['ipmi_hostname']))
print('{0}Machine details:{1}'.format(ansiiprint.bold(), ansiiprint.end()))
print(' {0}CPUs:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[0]))
print(' {0}Arch:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[1]))
print(' {0}OS:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[2]))
print(' {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticdata[3]))
# Check if our node exists in Zookeeper, and create it if not
if zk_conn.exists('/nodes/{}'.format(myhostname)):
print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper")
# Update static data just in case it's changed
zkhandler.writedata(zk_conn, { '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata) })
else:
print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node")
keepalive_time = int(time.time())
transaction = zk_conn.transaction()
transaction.create('/nodes/{}'.format(myhostname), 'hypervisor'.encode('ascii'))
# Basic state information
transaction.create('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii'))
transaction.create('/nodes/{}/domainstate'.format(myhostname), 'ready'.encode('ascii'))
transaction.create('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata).encode('ascii'))
transaction.create('/nodes/{}/memfree'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/memused'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/memalloc'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/vcpualloc'.format(myhostname), '0'.encode('ascii'))
transaction.create('/nodes/{}/cpuload'.format(myhostname), '0.0'.encode('ascii'))
transaction.create('/nodes/{}/runningdomains'.format(myhostname), ''.encode('ascii'))
transaction.create('/nodes/{}/domainscount'.format(myhostname), '0'.encode('ascii'))
# Keepalives and fencing information
transaction.create('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time).encode('ascii'))
transaction.create('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname'].encode('ascii'))
transaction.create('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username'].encode('ascii'))
transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii'))
transaction.commit()
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'init' })
t_node = dict()
s_domain = dict()
node_list = []
domain_list = []
@zk_conn.ChildrenWatch('/nodes')
def updatenodes(new_node_list):
global node_list
node_list = new_node_list
print(ansiiprint.blue() + 'Node list: ' + ansiiprint.end() + '{}'.format(' '.join(node_list)))
for node in node_list:
if node in t_node:
t_node[node].updatenodelist(t_node)
else:
t_node[node] = NodeInstance.NodeInstance(myhostname, node, t_node, s_domain, zk_conn, config)
@zk_conn.ChildrenWatch('/domains')
def updatedomains(new_domain_list):
global domain_list
domain_list = new_domain_list
print(ansiiprint.blue() + 'Domain list: ' + ansiiprint.end() + '{}'.format(' '.join(domain_list)))
for domain in domain_list:
if not domain in s_domain:
s_domain[domain] = VMInstance.VMInstance(domain, zk_conn, config, t_node[myhostname]);
for node in node_list:
if node in t_node:
t_node[node].updatedomainlist(s_domain)
# Set up our update function
this_node = t_node[myhostname]
update_zookeeper = this_node.update_zookeeper
# Create timer to update this node in Zookeeper
def createKeepaliveTimer():
interval = int(config['keepalive_interval'])
ansiiprint.echo('Starting keepalive timer ({} second interval)'.format(interval), '', 'o')
update_timer = apscheduler.schedulers.background.BackgroundScheduler()
update_timer.add_job(update_zookeeper, 'interval', seconds=interval)
update_timer.start()
return update_timer
def stopKeepaliveTimer(update_timer):
ansiiprint.echo('Stopping keepalive timer', '', 'c')
update_timer.shutdown()
# Start keepalive thread
update_timer = createKeepaliveTimer()
# Tick loop
while True:
try:
time.sleep(0.1)
except:
break

View File

@ -0,0 +1,501 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node and run by pvcd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os, sys, psutil, socket, time, libvirt, kazoo.client, threading, subprocess
import pvcd.ansiiprint as ansiiprint
import pvcd.zkhandler as zkhandler
class NodeInstance():
# Initialization function
def __init__(self, this_node, name, t_node, s_domain, zk_conn, config):
# Passed-in variables on creation
self.zk_conn = zk_conn
self.config = config
self.this_node = this_node
self.name = name
self.daemon_state = 'stop'
self.domain_state = 'ready'
self.t_node = t_node
self.active_node_list = []
self.flushed_node_list = []
self.inactive_node_list = []
self.s_domain = s_domain
self.domain_list = []
self.ipmi_hostname = self.config['ipmi_hostname']
self.domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
self.inflush = False
# Zookeeper handlers for changed states
@zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
def watch_hypervisor_daemonstate(data, stat, event=""):
try:
self.daemon_state = data.decode('ascii')
except AttributeError:
self.daemon_state = 'stop'
@zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
def watch_hypervisor_domainstate(data, stat, event=""):
try:
self.domain_state = data.decode('ascii')
except AttributeError:
self.domain_state = 'unknown'
# toggle state management of this node
if self.name == self.this_node:
if self.domain_state == 'flush' and self.inflush == False:
# Do flushing in a thread so it doesn't block the migrates out
flush_thread = threading.Thread(target=self.flush, args=(), kwargs={})
flush_thread.start()
if self.domain_state == 'unflush' and self.inflush == False:
self.unflush()
@zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
def watch_hypervisor_memfree(data, stat, event=""):
try:
self.memfree = data.decode('ascii')
except AttributeError:
self.memfree = 0
@zk_conn.DataWatch('/nodes/{}/memused'.format(self.name))
def watch_hypervisor_memused(data, stat, event=""):
try:
self.memused = data.decode('ascii')
except AttributeError:
self.memused = 0
@zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name))
def watch_hypervisor_memalloc(data, stat, event=""):
try:
self.memalloc = data.decode('ascii')
except AttributeError:
self.memalloc = 0
@zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name))
def watch_hypervisor_vcpualloc(data, stat, event=""):
try:
self.vcpualloc = data.decode('ascii')
except AttributeError:
self.vcpualloc = 0
@zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name))
def watch_hypervisor_runningdomains(data, stat, event=""):
try:
self.domain_list = data.decode('ascii').split()
except AttributeError:
self.domain_list = []
@zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
def watch_hypervisor_domainscount(data, stat, event=""):
try:
self.domains_count = data.decode('ascii')
except AttributeError:
self.domains_count = 0
# Get value functions
def getfreemem(self):
return self.memfree
def getallocmem(self):
return self.memalloc
def getallocvcpu(self):
return self.vcpualloc
def getcpuload(self):
return self.cpuload
def getname(self):
return self.name
def getdaemonstate(self):
return self.daemon_state
def getdomainstate(self):
return self.domain_state
def getdomainlist(self):
return self.domain_list
# Update value functions
def updatenodelist(self, t_node):
self.t_node = t_node
def updatedomainlist(self, s_domain):
self.s_domain = s_domain
# Flush all VMs on the host
def flush(self):
self.inflush = True
ansiiprint.echo('Flushing node "{}" of running VMs'.format(self.name), '', 'i')
ansiiprint.echo('Domain list: {}'.format(', '.join(self.domain_list)), '', 'c')
fixed_domain_list = self.domain_list.copy()
for dom_uuid in fixed_domain_list:
ansiiprint.echo('Selecting target to migrate VM "{}"'.format(dom_uuid), '', 'i')
current_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/hypervisor'.format(dom_uuid))
target_hypervisor = findTargetHypervisor(self.zk_conn, 'mem', dom_uuid)
if target_hypervisor == None:
ansiiprint.echo('Failed to find migration target for VM "{}"; shutting down'.format(dom_uuid), '', 'e')
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(dom_uuid): 'shutdown' })
else:
ansiiprint.echo('Migrating VM "{}" to hypervisor "{}"'.format(dom_uuid, target_hypervisor), '', 'i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'migrate',
'/domains/{}/hypervisor'.format(dom_uuid): target_hypervisor,
'/domains/{}/lasthypervisor'.format(dom_uuid): current_hypervisor
})
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
while True:
time.sleep(1)
vm_current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid))
if vm_current_state == "start":
break
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' })
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' })
self.inflush = False
def unflush(self):
self.inflush = True
ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i')
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' })
fixed_domain_list = self.s_domain.copy()
for dom_uuid in fixed_domain_list:
try:
last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid))
except:
continue
if last_hypervisor != self.name:
continue
ansiiprint.echo('Setting unmigration for VM "{}"'.format(dom_uuid), '', 'i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'migrate',
'/domains/{}/hypervisor'.format(dom_uuid): self.name,
'/domains/{}/lasthypervisor'.format(dom_uuid): ''
})
self.inflush = False
def update_zookeeper(self):
# Connect to libvirt
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
ansiiprint.echo('Failed to open connection to "{}"'.format(libvirt_name), '', 'e')
return
# Get past state and update if needed
past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name))
if past_state != 'run':
self.daemon_state = 'run'
zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(self.name): 'run' })
else:
self.daemon_state = 'run'
# Toggle state management of dead VMs to restart them
memalloc = 0
vcpualloc = 0
for domain, instance in self.s_domain.items():
if instance.inshutdown == False and domain in self.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.gethypervisor() == self.name:
if instance.getdom() != None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
raise
except Exception as e:
# Toggle a state "change"
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() })
# Ensure that any running VMs are readded to the domain_list
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
for domain in running_domains:
domain_uuid = domain.UUIDString()
if domain_uuid not in self.domain_list:
self.domain_list.append(domain_uuid)
# Set our information in zookeeper
self.name = lv_conn.getHostname()
self.memused = int(psutil.virtual_memory().used / 1024 / 1024)
self.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
self.memalloc = memalloc
self.vcpualloc = vcpualloc
self.cpuload = os.getloadavg()[0]
self.domains_count = len(lv_conn.listDomainsID())
keepalive_time = int(time.time())
try:
zkhandler.writedata(self.zk_conn, {
'/nodes/{}/memused'.format(self.name): str(self.memused),
'/nodes/{}/memfree'.format(self.name): str(self.memfree),
'/nodes/{}/memalloc'.format(self.name): str(self.memalloc),
'/nodes/{}/vcpualloc'.format(self.name): str(self.vcpualloc),
'/nodes/{}/cpuload'.format(self.name): str(self.cpuload),
'/nodes/{}/runningdomains'.format(self.name): ' '.join(self.domain_list),
'/nodes/{}/domainscount'.format(self.name): str(self.domains_count),
'/nodes/{}/keepalive'.format(self.name): str(keepalive_time)
})
except:
ansiiprint.echo('Failed to set keepalive data', '', 'e')
return
# Close the Libvirt connection
lv_conn.close()
# Display node information to the terminal
ansiiprint.echo('{}{} keepalive{}'.format(ansiiprint.purple(), self.name, ansiiprint.end()), '', 't')
ansiiprint.echo('{0}Active domains:{1} {2} {0}Allocated memory [MiB]:{1} {6} {0}Free memory [MiB]:{1} {3} {0}Used memory [MiB]:{1} {4} {0}Load:{1} {5}'.format(ansiiprint.bold(), ansiiprint.end(), self.domains_count, self.memfree, self.memused, self.cpuload, self.memalloc), '', 'c')
# Update our local node lists
for node_name in self.t_node:
try:
node_daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name))
node_domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/keepalive'.format(node_name)))
except:
node_daemon_state = 'unknown'
node_domain_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - ( int(self.config['keepalive_interval']) * int(self.config['fence_intervals']) )
if node_keepalive < node_deadtime and node_daemon_state == 'run':
ansiiprint.echo('Node {} seems dead - starting monitor for fencing'.format(node_name), '', 'w')
zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })
fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn, self.config), kwargs={})
fence_thread.start()
# Update the arrays
if node_daemon_state == 'run' and node_domain_state != 'flushed' and node_name not in self.active_node_list:
self.active_node_list.append(node_name)
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_daemon_state != 'run' and node_domain_state != 'flushed' and node_name not in self.inactive_node_list:
self.inactive_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
if node_domain_state == 'flushed' and node_name not in self.flushed_node_list:
self.flushed_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
# Display cluster information to the terminal
ansiiprint.echo('{}Cluster status{}'.format(ansiiprint.purple(), ansiiprint.end()), '', 't')
ansiiprint.echo('{}Active nodes:{} {}'.format(ansiiprint.bold(), ansiiprint.end(), ' '.join(self.active_node_list)), '', 'c')
ansiiprint.echo('{}Inactive nodes:{} {}'.format(ansiiprint.bold(), ansiiprint.end(), ' '.join(self.inactive_node_list)), '', 'c')
ansiiprint.echo('{}Flushed nodes:{} {}'.format(ansiiprint.bold(), ansiiprint.end(), ' '.join(self.flushed_node_list)), '', 'c')
#
# Find a migration target
#
def findTargetHypervisor(zk_conn, search_field, dom_uuid):
if search_field == 'mem':
return findTargetHypervisorMem(zk_conn, dom_uuid)
if search_field == 'load':
return findTargetHypervisorLoad(zk_conn, dom_uuid)
if search_field == 'vcpus':
return findTargetHypervisorVCPUs(zk_conn, dom_uuid)
if search_field == 'vms':
return findTargetHypervisorVMs(zk_conn, dom_uuid)
return None
# Get the list of valid target hypervisors
def getHypervisors(zk_conn, dom_uuid):
valid_hypervisor_list = []
full_hypervisor_list = zkhandler.listchildren(zk_conn, '/nodes')
current_hypervisor = zkhandler.readdata(zk_conn, '/domains/{}/hypervisor'.format(dom_uuid))
for hypervisor in full_hypervisor_list:
daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(hypervisor))
domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(hypervisor))
if hypervisor == current_hypervisor:
continue
if daemon_state != 'run' or domain_state != 'ready':
continue
valid_hypervisor_list.append(hypervisor)
return valid_hypervisor_list
# via free memory (relative to allocated memory)
def findTargetHypervisorMem(zk_conn, dom_uuid):
most_allocfree = 0
target_hypervisor = None
hypervisor_list = getHypervisors(zk_conn, dom_uuid)
for hypervisor in hypervisor_list:
memalloc = int(zkhandler.readdata(zk_conn, '/nodes/{}/memalloc'.format(hypervisor)))
memused = int(zkhandler.readdata(zk_conn, '/nodes/{}/memused'.format(hypervisor)))
memfree = int(zkhandler.readdata(zk_conn, '/nodes/{}/memfree'.format(hypervisor)))
memtotal = memused + memfree
allocfree = memtotal - memalloc
if allocfree > most_allocfree:
most_allocfree = allocfree
target_hypervisor = hypervisor
return target_hypervisor
# via load average
def findTargetHypervisorLoad(zk_conn, dom_uuid):
least_load = 9999
target_hypervisor = None
hypervisor_list = getHypervisors(zk_conn, dom_uuid)
for hypervisor in hypervisor_list:
load = int(zkhandler.readdata(zk_conn, '/nodes/{}/load'.format(hypervisor)))
if load < least_load:
least_load = load
target_hypevisor = hypervisor
return target_hypervisor
# via total vCPUs
def findTargetHypervisorVCPUs(zk_conn, dom_uuid):
least_vcpus = 9999
target_hypervisor = None
hypervisor_list = getHypervisors(zk_conn, dom_uuid)
for hypervisor in hypervisor_list:
vcpus = int(zkhandler.readdata(zk_conn, '/nodes/{}/vcpualloc'.format(hypervisor)))
if vcpus < least_vcpus:
least_vcpus = vcpus
target_hypervisor = hypervisor
return target_hypervisor
# via total VMs
def findTargetHypervisorVMs(zk_conn, dom_uuid):
least_vms = 9999
target_hypervisor = None
hypervisor_list = getHypervisors(zk_conn, dom_uuid)
for hypervisor in hypervisor_list:
vms = int(zkhandler.readdata(zk_conn, '/nodes/{}/domainscount'.format(hypervisor)))
if vms < least_vms:
least_vms = vms
target_hypervisor = hypervisor
return target_hypervisor
#
# Fence thread entry function
#
def fenceNode(node_name, zk_conn, config):
failcount = 0
# We allow exactly 3 saving throws for the host to come back online
while failcount < 3:
# Wait 5 seconds
time.sleep(5)
# Get the state
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
# Is it still 'dead'
if node_daemon_state == 'dead':
failcount += 1
ansiiprint.echo('Node "{}" failed {} saving throws'.format(node_name, failcount), '', 'w')
# It changed back to something else so it must be alive
else:
ansiiprint.echo('Node "{}" passed a saving throw; canceling fence'.format(node_name), '', 'o')
return
ansiiprint.echo('Fencing node "{}" via IPMI reboot signal'.format(node_name), '', 'e')
# Get IPMI information
ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name))
ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name))
ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name))
# Shoot it in the head
fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password)
# Hold to ensure the fence takes effect
time.sleep(3)
# If the fence succeeded and successful_fence is migrate
if fence_status == True and config['successful_fence'] == 'migrate':
migrateFromFencedHost(zk_conn, node_name)
# If the fence failed and failed_fence is migrate
if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0':
migrateFromFencedHost(zk_conn, node_name)
# Migrate hosts away from a fenced node
def migrateFromFencedHost(zk_conn, node_name):
ansiiprint.echo('Moving VMs from dead hypervisor "{}" to new hosts'.format(node_name), '', 'i')
dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split()
for dom_uuid in dead_node_running_domains:
target_hypervisor = findTargetHypervisor(zk_conn, 'mem', dom_uuid)
ansiiprint.echo('Moving VM "{}" to hypervisor "{}"'.format(dom_uuid, target_hypervisor), '', 'i')
zkhandler.writedata(zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'start',
'/domains/{}/hypervisor'.format(dom_uuid): target_hypervisor,
'/domains/{}/lasthypervisor'.format(dom_uuid): node_name
})
# Set node in flushed state for easy remigrating when it comes back
zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' })
#
# Perform an IPMI fence
#
def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password):
ipmi_command = ['/usr/bin/ipmitool', '-I', 'lanplus', '-H', ipmi_hostname, '-U', ipmi_user, '-P', ipmi_password, 'chassis', 'power', 'reset']
ipmi_command_output = subprocess.run(ipmi_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if ipmi_command_output.returncode == 0:
ansiiprint.echo('Successfully rebooted dead node', '', 'o')
return True
else:
ansiiprint.echo('Failed to reboot dead node', '', 'e')
return False

View File

@ -0,0 +1,424 @@
#!/usr/bin/env python3
# VMInstance.py - Class implementing a PVC virtual machine and run by pvcd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os, sys, uuid, socket, time, threading, libvirt, kazoo.client
import pvcd.ansiiprint as ansiiprint
import pvcd.zkhandler as zkhandler
class VMInstance:
# Initialization function
def __init__(self, domuuid, zk_conn, config, thishypervisor):
# Passed-in variables on creation
self.domuuid = domuuid
self.zk_conn = zk_conn
self.config = config
self.thishypervisor = thishypervisor
# These will all be set later
self.hypervisor = None
self.state = None
self.instart = False
self.inrestart = False
self.inmigrate = False
self.inreceive = False
self.inshutdown = False
self.instop = False
self.dom = self.lookupByUUID(self.domuuid)
# Watch for changes to the state field in Zookeeper
@zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
def watch_state(data, stat, event=""):
# If we get a delete state, just terminate outselves
if data == None:
return
# Otherwise perform a management command
else:
self.manage_vm_state()
# Get data functions
def getstate(self):
return self.state
def gethypervisor(self):
return self.hypervisor
def getdom(self):
return self.dom
def getmemory(self):
try:
memory = int(self.dom.info()[2] / 1024)
except:
memory = 0
return memory
def getvcpus(self):
try:
vcpus = int(self.dom.info()[3])
except:
vcpus = 0
return vcpus
# Manage local node domain_list
def addDomainToList(self):
if not self.domuuid in self.thishypervisor.domain_list:
try:
# Add the domain to the domain_list array
self.thishypervisor.domain_list.append(self.domuuid)
# Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.thishypervisor.name): ' '.join(self.thishypervisor.domain_list) })
except Exception as e:
ansiiprint.echo('Error adding domain to list: {}'.format(e), '', 'c')
def removeDomainFromList(self):
if self.domuuid in self.thishypervisor.domain_list:
try:
# Remove the domain from the domain_list array
self.thishypervisor.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.thishypervisor.name): ' '.join(self.thishypervisor.domain_list) })
except Exception as e:
ansiiprint.echo('Error removing domain from list: {}'.format(e), '', 'c')
# Start up the VM
def start_vm(self):
ansiiprint.echo('Starting VM', '{}:'.format(self.domuuid), 'i')
self.instart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
ansiiprint.echo('Failed to open local libvirt connection', '{}:'.format(self.domuuid), 'e')
self.instart = False
return
# Try to get the current state in case it's already running
try:
self.dom = self.lookupByUUID(self.domuuid)
curstate = self.dom.state()[0]
except:
curstate = 'notstart'
if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
else:
# Or try to create it
try:
# Grab the domain information from Zookeeper
xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList()
ansiiprint.echo('Successfully started VM', '{}:'.format(self.domuuid), 'o')
self.dom = dom
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
except libvirt.libvirtError as e:
ansiiprint.echo('Failed to create VM', '{}:'.format(self.domuuid), 'e')
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'failed' })
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): str(e) })
self.dom = None
lv_conn.close()
self.instart = False
# Restart the VM
def restart_vm(self):
ansiiprint.echo('Restarting VM', '{}:'.format(self.domuuid), 'i')
self.inrestart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
ansiiprint.echo('Failed to open local libvirt connection', '{}:'.format(self.domuuid), 'e')
self.inrestart = False
return
self.shutdown_vm()
self.start_vm()
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
lv_conn.close()
self.inrestart = False
# Stop the VM forcibly without updating state
def terminate_vm(self):
ansiiprint.echo('Terminating VM', '{}:'.format(self.domuuid), 'i')
self.instop = True
try:
self.dom.destroy()
except AttributeError:
ansiiprint.echo('Failed to terminate VM', '{}:'.format(self.domuuid), 'e')
self.removeDomainFromList()
ansiiprint.echo('Successfully terminated VM', '{}:'.format(self.domuuid), 'o')
self.dom = None
self.instop = False
# Stop the VM forcibly
def stop_vm(self):
ansiiprint.echo('Forcibly stopping VM', '{}:'.format(self.domuuid), 'i')
self.instop = True
try:
self.dom.destroy()
except AttributeError:
ansiiprint.echo('Failed to stop VM', '{}:'.format(self.domuuid), 'e')
self.removeDomainFromList()
if self.inrestart == False:
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
ansiiprint.echo('Successfully stopped VM', '{}:'.format(self.domuuid), 'o')
self.dom = None
self.instop = False
# Shutdown the VM gracefully
def shutdown_vm(self):
ansiiprint.echo('Gracefully stopping VM', '{}:'.format(self.domuuid), 'i')
self.inshutdown = True
self.dom.shutdown()
try:
tick = 0
while self.dom.state()[0] == libvirt.VIR_DOMAIN_RUNNING and tick < 60:
tick += 1
time.sleep(0.5)
if tick >= 60:
ansiiprint.echo('Shutdown timeout expired', '{}:'.format(self.domuuid), 'e')
self.stop_vm()
self.inshutdown = False
return
except:
pass
self.removeDomainFromList()
if self.inrestart == False:
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
ansiiprint.echo('Successfully shutdown VM', '{}:'.format(self.domuuid), 'o')
self.dom = None
self.inshutdown = False
def live_migrate_vm(self, dest_hypervisor):
try:
dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.hypervisor))
if dest_lv_conn == None:
raise
except:
ansiiprint.echo('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.hypervisor), '{}:'.format(self.domuuid), 'e')
return 1
try:
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0)
if target_dom == None:
raise
ansiiprint.echo('Successfully migrated VM', '{}:'.format(self.domuuid), 'o')
except:
dest_lv_conn.close()
return 1
dest_lv_conn.close()
return 0
# Migrate the VM to a target host
def migrate_vm(self):
self.inmigrate = True
ansiiprint.echo('Migrating VM to hypervisor "{}"'.format(self.hypervisor), '{}:'.format(self.domuuid), 'i')
try:
migrate_ret = self.live_migrate_vm(self.hypervisor)
except:
migrate_ret = 0
if migrate_ret != 0:
ansiiprint.echo('Could not live migrate VM; shutting down to migrate instead', '{}:'.format(self.domuuid), 'e')
self.shutdown_vm()
time.sleep(1)
else:
self.removeDomainFromList()
time.sleep(1)
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.inmigrate = False
# Receive the migration from another host (wait until VM is running)
def receive_migrate(self):
self.inreceive = True
ansiiprint.echo('Receiving migration', '{}:'.format(self.domuuid), 'i')
while True:
time.sleep(0.5)
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
if self.dom == None and self.state == 'migrate':
continue
if self.state != 'migrate':
break
try:
if self.dom.state()[0] == libvirt.VIR_DOMAIN_RUNNING:
break
except:
continue
try:
dom_state = self.dom.state()[0]
except AttributeError:
dom_state = None
if dom_state == libvirt.VIR_DOMAIN_RUNNING:
self.addDomainToList()
ansiiprint.echo('Successfully received migrated VM', '{}:'.format(self.domuuid), 'o')
else:
ansiiprint.echo('Failed to receive migrated VM', '{}:'.format(self.domuuid), 'e')
self.inreceive = False
#
# Main function to manage a VM (taking only self)
#
def manage_vm_state(self):
# Give ourselves a bit of leeway time
time.sleep(0.2)
# Get the current values from zookeeper (don't rely on the watch)
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/hypervisor'.format(self.domuuid))
# Check the current state of the VM
try:
if self.dom != None:
running, reason = self.dom.state()
else:
raise
except:
running = libvirt.VIR_DOMAIN_NOSTATE
ansiiprint.echo('VM state change for "{}": {} {}'.format(self.domuuid, self.state, self.hypervisor), '', 'i')
#######################
# Handle state changes
#######################
# Valid states are:
# start
# migrate
# restart
# shutdown
# stop
# Conditional pass one - Are we already performing an action
if self.instart == False \
and self.inrestart == False \
and self.inmigrate == False \
and self.inreceive == False \
and self.inshutdown == False \
and self.instop == False:
# Conditional pass two - Is this VM configured to run on this hypervisor
if self.hypervisor == self.thishypervisor.name:
# Conditional pass three - Is this VM currently running on this hypervisor
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM is already running and should be
if self.state == "start":
self.addDomainToList()
# VM is already running and should be but stuck in migrate state
elif self.state == "migrate":
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.addDomainToList()
# VM should be restarted
elif self.state == "restart":
self.restart_vm()
# VM should be shut down
elif self.state == "shutdown":
self.shutdown_vm()
# VM should be stopped
elif self.state == "stop":
self.stop_vm()
else:
# VM should be started
if self.state == "start":
self.start_vm()
# VM should be migrated to this hypervisor
elif self.state == "migrate":
self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running)
if self.state == "restart":
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
# VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown":
self.removeDomainFromList()
# VM should be stoped; ensure it's gone from this node's domain_list
elif self.state == "stop":
self.removeDomainFromList()
else:
# Conditional pass three - Is this VM currently running on this hypervisor
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM should be migrated away from this hypervisor
if self.state == "migrate":
self.migrate_vm()
# VM should be terminated
else:
self.terminate_vm()
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
# 1. Takes a text UUID and handles converting it to bytes
# 2. Try's it and returns a sensible value if not
def lookupByUUID(self, tuuid):
lv_conn = None
dom = None
libvirt_name = "qemu:///system"
# Convert the text UUID to bytes
buuid = uuid.UUID(tuuid).bytes
# Try
try:
# Open a libvirt connection
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
ansiiprint.echo('Failed to open local libvirt connection', '{}:'.format(self.domuuid), 'e')
return dom
# Lookup the UUID
dom = lv_conn.lookupByUUID(buuid)
# Fail
except:
pass
# After everything
finally:
# Close the libvirt connection
if lv_conn != None:
lv_conn.close()
# Return the dom object (or None)
return dom

View File

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# ansiprint.py - Printing function for formatted messages
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import datetime
# ANSII colours for output
def red():
return '\033[91m'
def blue():
return '\033[94m'
def green():
return '\033[92m'
def yellow():
return '\033[93m'
def purple():
return '\033[95m'
def bold():
return '\033[1m'
def end():
return '\033[0m'
# Print function
def echo(message, prefix, state):
# Get the date
date = '{} - '.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'))
endc = end()
# Continuation
if state == 'c':
date = ''
colour = ''
prompt = ' '
# OK
elif state == 'o':
colour = green()
prompt = '>>> '
# Error
elif state == 'e':
colour = red()
prompt = '>>> '
# Warning
elif state == 'w':
colour = yellow()
prompt = '>>> '
# Tick
elif state == 't':
colour = purple()
prompt = '>>> '
# Information
elif state == 'i':
colour = blue()
prompt = '>>> '
else:
colour = bold()
prompt = '>>> '
# Append space to prefix
if prefix != '':
prefix = prefix + ' '
print(colour + prompt + endc + date + prefix + message)

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
# zkhandler.py - Secure versioned ZooKeeper updates
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import kazoo.client
import pvcd.ansiiprint as ansiiprint
# Child list function
def listchildren(zk_conn, key):
children = zk_conn.get_children(key)
return children
# Data read function
def readdata(zk_conn, key):
data_raw = zk_conn.get(key)
data = data_raw[0].decode('ascii')
meta = data_raw[1]
return data
# Data write function
def writedata(zk_conn, kv):
# Get the current version; we base this off the first key (ordering in multi-key calls is irrelevant)
first_key = list(kv.keys())[0]
orig_data_raw = zk_conn.get(first_key)
meta = orig_data_raw[1]
if meta == None:
ansiiprint.echo('Zookeeper key "{}" does not exist'.format(first_key), '', 'e')
return 1
version = meta.version
new_version = version + 1
zk_transaction = zk_conn.transaction()
for key, data in kv.items():
zk_transaction.set_data(key, data.encode('ascii'))
try:
zk_transaction.check(first_key, new_version)
except TypeError:
ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(first_key), '', 'e')
return 1
zk_transaction.commit()
return 0