diff --git a/NodeInstance.py b/NodeInstance.py index d8f0d895..d71df573 100644 --- a/NodeInstance.py +++ b/NodeInstance.py @@ -20,17 +20,15 @@ # ############################################################################### -import os, sys, socket, time, threading, libvirt, kazoo.client +import os, sys, socket, time, libvirt, kazoo.client -class NodeInstance(threading.Thread): +class NodeInstance(): def __init__(self, name, t_node, s_domain, zk): - super(NodeInstance, self).__init__() # Passed-in variables on creation self.zkey = '/nodes/%s' % name self.zk = zk self.name = name self.state = 'stop' - self.stop_thread = threading.Event() self.t_node = t_node self.active_node_list = [] self.flushed_node_list = [] @@ -88,10 +86,6 @@ class NodeInstance(threading.Thread): def updatedomainlist(self, s_domain): self.s_domain = s_domain - # Shutdown the thread - def stop(self): - self.stop_thread.set() - # Flush all VMs on the host def flush(self): for domain in self.domain_list: @@ -143,11 +137,7 @@ class NodeInstance(threading.Thread): self.zk.set("/nodes/" + self.name + "/state", 'start'.encode('ascii')) - def run(self): - if self.name == socket.gethostname(): - self.setup_local_node() - - def setup_local_node(self): + def update_zookeeper(self): # Connect to libvirt libvirt_name = "qemu:///system" conn = libvirt.open(libvirt_name) @@ -170,87 +160,79 @@ class NodeInstance(threading.Thread): else: self.state = 'flush' - while True: - # Toggle state management of all VMs and remove any non-running VMs - for domain, instance in self.s_domain.items(): - if instance.inshutdown == False and domain in self.domain_list: - instance.manage_vm_state() - if instance.dom == None: + # Toggle state management of all VMs and remove any non-running VMs + for domain, instance in self.s_domain.items(): + if instance.inshutdown == False and domain in self.domain_list: + instance.manage_vm_state() + if instance.dom == None: + try: + self.domain_list.remove(domain) + except: + pass + else: + try: + state = instance.dom.state()[0] + except: + state = libvirt.VIR_DOMAIN_NOSTATE + + if state != libvirt.VIR_DOMAIN_RUNNING: try: self.domain_list.remove(domain) except: pass - else: - try: - state = instance.dom.state()[0] - except: - state = libvirt.VIR_DOMAIN_NOSTATE - - if state != libvirt.VIR_DOMAIN_RUNNING: - try: - self.domain_list.remove(domain) - except: - pass - # Set our information in zookeeper - self.memfree = conn.getFreeMemory() - self.cpuload = os.getloadavg()[0] - try: - self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii')) - self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii')) - self.zk.set(self.zkey + '/runningdomains', ' '.join(self.domain_list).encode('ascii')) - except: - if self.stop_thread.is_set(): - return + # Set our information in zookeeper + self.memfree = conn.getFreeMemory() + self.cpuload = os.getloadavg()[0] + try: + self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii')) + self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii')) + self.zk.set(self.zkey + '/runningdomains', ' '.join(self.domain_list).encode('ascii')) + except: + return - print(">>> %s - Free memory: %s | Load: %s" % ( time.strftime("%d/%m/%Y %H:%M:%S"), self.memfree, self.cpuload )) - print("Active domains: %s" % self.domain_list) + print(">>> %s - Free memory: %s | Load: %s" % ( time.strftime("%d/%m/%Y %H:%M:%S"), self.memfree, self.cpuload )) + print("Active domains: %s" % self.domain_list) - # Update our local node lists - for node_name in self.t_node: + # Update our local node lists + for node_name in self.t_node: + try: + state, stat = self.zk.get('/nodes/%s/state' % node_name) + node_state = state.decode('ascii') + except: + node_state = 'stop' + + if node_state == 'start' and node_name not in self.active_node_list: + self.active_node_list.append(node_name) try: - state, stat = self.zk.get('/nodes/%s/state' % node_name) - node_state = state.decode('ascii') - except: - node_state = 'stop' - - if node_state == 'start' and node_name not in self.active_node_list: - self.active_node_list.append(node_name) - try: - self.flushed_node_list.remove(node_name) - except ValueError: - pass - try: - self.inactive_node_list.remove(node_name) - except ValueError: - pass - if node_state == 'flush' and node_name not in self.flushed_node_list: - self.flushed_node_list.append(node_name) - try: - self.active_node_list.remove(node_name) - except ValueError: - pass - try: - self.inactive_node_list.remove(node_name) - except ValueError: - pass - if node_state != 'start' and node_state != 'flush' and node_name not in self.inactive_node_list: - self.inactive_node_list.append(node_name) - try: - self.active_node_list.remove(node_name) - except ValueError: - pass - try: - self.flushed_node_list.remove(node_name) - except ValueError: - pass - - print('Active nodes: %s' % self.active_node_list) - print('Flushed nodes: %s' % self.flushed_node_list) - print('Inactive nodes: %s' % self.inactive_node_list) - - # Sleep for 9s but with quick interruptability - for x in range(0,90): - time.sleep(0.1) - if self.stop_thread.is_set(): - sys.exit(0) + self.flushed_node_list.remove(node_name) + except ValueError: + pass + try: + self.inactive_node_list.remove(node_name) + except ValueError: + pass + if node_state == 'flush' and node_name not in self.flushed_node_list: + self.flushed_node_list.append(node_name) + try: + self.active_node_list.remove(node_name) + except ValueError: + pass + try: + self.inactive_node_list.remove(node_name) + except ValueError: + pass + if node_state != 'start' and node_state != 'flush' and node_name not in self.inactive_node_list: + self.inactive_node_list.append(node_name) + try: + self.active_node_list.remove(node_name) + except ValueError: + pass + try: + self.flushed_node_list.remove(node_name) + except ValueError: + pass + + print('Active nodes: %s' % self.active_node_list) + print('Flushed nodes: %s' % self.flushed_node_list) + print('Inactive nodes: %s' % self.inactive_node_list) diff --git a/pvcd.py b/pvcd.py index 479014c6..88f86aaa 100755 --- a/pvcd.py +++ b/pvcd.py @@ -20,8 +20,7 @@ # ############################################################################### -from kazoo.client import KazooClient -from kazoo.client import KazooState +import kazoo.client import libvirt import sys import socket @@ -29,8 +28,8 @@ import uuid import VMInstance import NodeInstance import time -import threading import atexit +import apscheduler.schedulers.background def help(): print("pvcd - Parallel Virtual Cluster management daemon") @@ -38,8 +37,8 @@ def help(): help() -# Connect to zookeeper -zk = KazooClient(hosts='127.0.0.1:2181') +# Connect to local zookeeper +zk = kazoo.client.KazooClient(hosts='127.0.0.1:2181') try: zk.start() except: @@ -47,10 +46,10 @@ except: exit(1) def zk_listener(state): - if state == KazooState.LOST: + if state == kazoo.client.KazooState.LOST: cleanup() exit(2) - elif state == KazooState.SUSPENDED: + elif state == kazoo.client.KazooState.SUSPENDED: cleanup() exit(2) else: @@ -63,9 +62,8 @@ myhostname = socket.gethostname() mynodestring = '/nodes/%s' % myhostname def cleanup(): - t_node[myhostname].stop() - time.sleep(0.2) try: + update_timer.shutdown() if t_node[myhostname].getstate() != 'flush': zk.set('/nodes/' + myhostname + '/state', 'stop'.encode('ascii')) zk.stop() @@ -75,6 +73,7 @@ def cleanup(): atexit.register(cleanup) + # Check if our node exists in Zookeeper, and create it if not if zk.exists('%s' % mynodestring): print("Node is present in Zookeeper") @@ -114,13 +113,18 @@ def updatedomains(new_domain_list): if node in t_node: t_node[node].updatedomainlist(s_domain) -t_node[myhostname].start() -time.sleep(0.2) +# Set up our update function +this_node = t_node[myhostname] +update_zookeeper = this_node.update_zookeeper +# Create timer to update this node in Zookeeper +update_timer = apscheduler.schedulers.background.BackgroundScheduler() +update_timer.add_job(update_zookeeper, 'interval', seconds=2) +update_timer.start() + +# Tick loop while True: - # Tick loop try: time.sleep(0.1) except: - cleanup() - exit(0) + break