diff --git a/pvcd/NodeInstance.py b/pvcd/NodeInstance.py index e0266228..c3657461 100644 --- a/pvcd/NodeInstance.py +++ b/pvcd/NodeInstance.py @@ -130,21 +130,21 @@ class NodeInstance(): for dom_uuid in self.domain_list: most_memfree = 0 target_hypervisor = None - hypervisor_list = self.zk_conn.get_children('/nodes') - current_hypervisor = self.zk_conn.get('/domains/{}/hypervisor'.format(dom_uuid))[0].decode('ascii') + hypervisor_list = zkhander.listchildren(self.zk_conn, '/nodes') + current_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/hypervisor'.format(dom_uuid)) if current_hypervisor != self.this_node: continue for hypervisor in hypervisor_list: - daemon_state = self.zk_conn.get('/nodes/{}/daemonstate'.format(hypervisor))[0].decode('ascii') - domain_state = self.zk_conn.get('/nodes/{}/domainstate'.format(hypervisor))[0].decode('ascii') + daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(hypervisor)) + domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(hypervisor)) if hypervisor == current_hypervisor: continue if daemon_state != 'run' or domain_state != 'ready': continue - memfree = int(self.zk_conn.get('/nodes/{}/memfree'.format(hypervisor))[0].decode('ascii')) + memfree = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/memfree'.format(hypervisor))) if memfree > most_memfree: most_memfree = memfree target_hypervisor = hypervisor @@ -171,7 +171,7 @@ class NodeInstance(): ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i') self.zk_conn.set('/nodes/{}/domainstate'.format(self.name), 'ready'.encode('ascii')) for dom_uuid in self.s_domain: - last_hypervisor = self.zk_conn.get('/domains/{}/lasthypervisor'.format(dom_uuid))[0].decode('ascii') + last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid)) if last_hypervisor != self.name: continue @@ -193,7 +193,7 @@ class NodeInstance(): return # Get past state and update if needed - past_state = self.zk_conn.get('/nodes/{}/daemonstate'.format(self.name))[0].decode('ascii') + past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name)) if past_state != 'run': self.daemon_state = 'run' self.zk_conn.set('/nodes/{}/daemonstate'.format(self.name), 'run'.encode('ascii')) @@ -241,9 +241,9 @@ class NodeInstance(): # Update our local node lists for node_name in self.t_node: try: - node_daemon_state = self.zk_conn.get('/nodes/{}/daemonstate'.format(node_name))[0].decode('ascii') - node_domain_state = self.zk_conn.get('/nodes/{}/domainstate'.format(node_name))[0].decode('ascii') - node_keepalive = int(self.zk_conn.get('/nodes/{}/keepalive'.format(node_name))[0].decode('ascii')) + node_daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name)) + node_domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name)) + node_keepalive = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/keepalive'.format(node_name))) except: node_daemon_state = 'unknown' node_domain_state = 'unknown' @@ -307,7 +307,7 @@ def fenceNode(node_name, zk_conn): # Wait 5 seconds time.sleep(5) # Get the state - node_daemon_state = zk_conn.get('/nodes/{}/daemonstate'.format(node_name))[0].decode('ascii') + node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) # Is it still 'dead' if node_daemon_state == 'dead': failcount += 1 @@ -319,26 +319,26 @@ def fenceNode(node_name, zk_conn): ansiiprint.echo('Fencing node "{}" via IPMI reboot signal'.format(node_name), '', 'e') - ipmi_hostname = zk_conn.get('/nodes/{}/ipmihostname'.format(node_name))[0].decode('ascii') - ipmi_username = zk_conn.get('/nodes/{}/ipmiusername'.format(node_name))[0].decode('ascii') - ipmi_password = zk_conn.get('/nodes/{}/ipmipassword'.format(node_name))[0].decode('ascii') + ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name)) + ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name)) + ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name)) rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password) time.sleep(5) ansiiprint.echo('Moving VMs from dead hypervisor "{}" to new hosts'.format(node_name), '', 'i') - dead_node_running_domains = zk_conn.get('/nodes/{}/runningdomains'.format(node_name))[0].decode('ascii').split() + dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split() for dom_uuid in dead_node_running_domains: most_memfree = 0 - hypervisor_list = zk_conn.get_children('/nodes') - current_hypervisor = zk_conn.get('/domains/{}/hypervisor'.format(dom_uuid))[0].decode('ascii') + hypervisor_list = zkhandler.listchildren(zk_conn, '/nodes') + current_hypervisor = zkhandler.readdata(zk_conn, '/domains/{}/hypervisor'.format(dom_uuid)) for hypervisor in hypervisor_list: print(hypervisor) - daemon_state = zk_conn.get('/nodes/{}/daemonstate'.format(hypervisor))[0].decode('ascii') - domain_state = zk_conn.get('/nodes/{}/domainstate'.format(hypervisor))[0].decode('ascii') + daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(hypervisor)) + domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(hypervisor)) if daemon_state != 'run' or domain_state != 'ready': continue - memfree = int(zk_conn.get('/nodes/{}/memfree'.format(hypervisor))[0].decode('ascii')) + memfree = int(zkhandler.readdata(zk_conn, '/nodes/{}/memfree'.format(hypervisor))) if memfree > most_memfree: most_memfree = memfree target_hypervisor = hypervisor diff --git a/pvcd/VMInstance.py b/pvcd/VMInstance.py index f7be7796..71b0251a 100644 --- a/pvcd/VMInstance.py +++ b/pvcd/VMInstance.py @@ -22,6 +22,7 @@ import os, sys, uuid, socket, time, threading, libvirt, kazoo.client import pvcd.ansiiprint as ansiiprint +import pvcd.zkhandler as zkhandler class VMInstance: # Initialization function @@ -100,7 +101,7 @@ class VMInstance: try: # Grab the domain information from Zookeeper - xmlconfig = self.zk_conn.get('/domains/{}/xml'.format(self.domuuid))[0].decode('ascii') + xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid)) dom = lv_conn.createXML(xmlconfig, 0) self.addDomainToList() ansiiprint.echo('Successfully started VM', '{}:'.format(self.domuuid), 'o') @@ -245,7 +246,7 @@ class VMInstance: ansiiprint.echo('Receiving migration', '{}:'.format(self.domuuid), 'i') while True: time.sleep(0.5) - self.state = self.zk_conn.get('/domains/{}/state'.format(self.domuuid))[0].decode('ascii') + self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) self.dom = self.lookupByUUID(self.domuuid) if self.dom == None and self.state == 'migrate': @@ -281,8 +282,8 @@ class VMInstance: time.sleep(0.2) # Get the current values from zookeeper (don't rely on the watch) - self.state = self.zk_conn.get('/domains/{}/state'.format(self.domuuid))[0].decode('ascii') - self.hypervisor = self.zk_conn.get('/domains/{}/hypervisor'.format(self.domuuid))[0].decode('ascii') + self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + self.hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/hypervisor'.format(self.domuuid)) # Check the current state of the VM try: diff --git a/pvcd/zkhandler.py b/pvcd/zkhandler.py new file mode 100644 index 00000000..de651c1c --- /dev/null +++ b/pvcd/zkhandler.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +# zkhandler.py - Secure versioned ZooKeeper updates +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import kazoo.client, ansiiprint + +# Child list function +def listchildren(zk_conn, key): + children = zk_conn.get_children(key) + return children + +# Data read function +def readdata(zk_conn, key): + data_raw = zk_conn.get(key) + data = data_raw[0].decode('ascii') + meta = data_raw[1] + return data + +# Data write function +def writedata(zk_conn, key, data): + # Get the current version + orig_data_raw = zk_conn.get(key) + meta = orig_data_raw[1] + if meta == None: + ansiiprint.echo('Zookeeper key "{}" does not exist'.format(key), '', 'e') + return 1 + + version = meta.version + new_version = version + 1 + zk_transaction = zk_conn.transaction() + for line in data: + zk_transaction.set_data(key, line.encode('ascii')) + try: + zk_transaction.check(key, new_version) + except TypeError: + ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(key), '', 'e') + return 1 + zk_transaction.commit() + return 0 + +# Key create function +def createkey(zk_conn, key, data): + zk_transaction = zk_conn.transaction() + for line in data: + zk_transaction.create(key, line.encode('ascii')) + zk_transaction.commit() +