diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index d5c2b856..12e551a8 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -830,7 +830,7 @@ def update_nodes(new_node_list): # Add any missing nodes to the list for node in new_node_list: if node not in node_list: - d_node[node] = NodeInstance.NodeInstance(node, myhostname, zkhandler.zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api) + d_node[node] = NodeInstance.NodeInstance(node, myhostname, zkhandler, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api) # Remove any deleted nodes from the list for node in node_list: diff --git a/node-daemon/pvcnoded/NodeInstance.py b/node-daemon/pvcnoded/NodeInstance.py index 72bcc878..e28a5f86 100644 --- a/node-daemon/pvcnoded/NodeInstance.py +++ b/node-daemon/pvcnoded/NodeInstance.py @@ -23,23 +23,22 @@ import time from threading import Thread -import pvcnoded.zkhandler as zkhandler import pvcnoded.common as common class NodeInstance(object): # Initialization function - def __init__(self, name, this_node, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api): + def __init__(self, name, this_node, zkhandler, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api): # Passed-in variables on creation self.name = name self.this_node = this_node - self.zk_conn = zk_conn + self.zkhandler = zkhandler self.config = config self.logger = logger # Which node is primary self.primary_node = None # States - self.daemon_mode = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonmode'.format(self.name)) + self.daemon_mode = self.zkhandler.read('/nodes/{}/daemonmode'.format(self.name)) self.daemon_state = 'stop' self.router_state = 'client' self.domain_state = 'ready' @@ -91,7 +90,7 @@ class NodeInstance(object): self.flush_stopper = False # Zookeeper handlers for changed states - @self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name)) def watch_node_daemonstate(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -106,7 +105,7 @@ class NodeInstance(object): if data != self.daemon_state: self.daemon_state = data - @self.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name)) def watch_node_routerstate(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -135,9 +134,11 @@ class NodeInstance(object): transition_thread.start() else: # We did nothing, so just become secondary state - zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'secondary'}) + self.zkhandler.write([ + ('/nodes/{}/routerstate'.format(self.name), 'secondary') + ]) - @self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name)) def watch_node_domainstate(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -170,7 +171,7 @@ class NodeInstance(object): self.flush_thread = Thread(target=self.unflush, args=(), kwargs={}) self.flush_thread.start() - @self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) def watch_node_memfree(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -185,7 +186,7 @@ class NodeInstance(object): if data != self.memfree: self.memfree = data - @self.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name)) def watch_node_memused(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -200,7 +201,7 @@ class NodeInstance(object): if data != self.memused: self.memused = data - @self.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name)) def watch_node_memalloc(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -215,7 +216,7 @@ class NodeInstance(object): if data != self.memalloc: self.memalloc = data - @self.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name)) def watch_node_vcpualloc(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -230,7 +231,7 @@ class NodeInstance(object): if data != self.vcpualloc: self.vcpualloc = data - @self.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name)) def watch_node_runningdomains(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -245,7 +246,7 @@ class NodeInstance(object): if data != self.domain_list: self.domain_list = data - @self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name)) def watch_node_domainscount(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -323,26 +324,30 @@ class NodeInstance(object): Acquire primary coordinator status from a peer node """ # Lock the primary node until transition is complete - primary_lock = zkhandler.exclusivelock(self.zk_conn, '/config/primary_node') + primary_lock = self.zkhandler.exclusivelock('/config/primary_node') primary_lock.acquire() # Ensure our lock key is populated - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) # Synchronize nodes A (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase A', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase A', state='o') time.sleep(1) # Time fir reader to acquire the lock self.logger.out('Releasing write lock for synchronization phase A', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase A', state='o') time.sleep(0.1) # Time fir new writer to acquire the lock # Synchronize nodes B (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase B', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase B', state='o') @@ -351,7 +356,7 @@ class NodeInstance(object): self.logger.out('Released read lock for synchronization phase B', state='o') # Synchronize nodes C (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase C', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase C', state='o') @@ -367,12 +372,14 @@ class NodeInstance(object): ) common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream') self.logger.out('Releasing write lock for synchronization phase C', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase C', state='o') # Synchronize nodes D (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase D', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase D', state='o') @@ -397,12 +404,14 @@ class NodeInstance(object): ) common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage') self.logger.out('Releasing write lock for synchronization phase D', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase D', state='o') # Synchronize nodes E (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase E', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase E', state='o') @@ -418,12 +427,14 @@ class NodeInstance(object): ) common.createIPAddress('169.254.169.254', '32', 'lo') self.logger.out('Releasing write lock for synchronization phase E', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase E', state='o') # Synchronize nodes F (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase F', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase F', state='o') @@ -432,12 +443,14 @@ class NodeInstance(object): for network in self.d_network: self.d_network[network].createGateways() self.logger.out('Releasing write lock for synchronization phase F', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase F', state='o') # Synchronize nodes G (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase G', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase G', state='o') @@ -504,14 +517,18 @@ class NodeInstance(object): else: self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e') self.logger.out('Releasing write lock for synchronization phase G', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase G', state='o') # Wait 2 seconds for everything to stabilize before we declare all-done time.sleep(2) primary_lock.release() - zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'primary'}) + self.zkhandler.write([ + ('/nodes/{}/routerstate'.format(self.name), 'primary') + ]) self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o') def become_secondary(self): @@ -521,7 +538,7 @@ class NodeInstance(object): time.sleep(0.2) # Initial delay for the first writer to grab the lock # Synchronize nodes A (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase A', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase A', state='o') @@ -530,7 +547,7 @@ class NodeInstance(object): self.logger.out('Released read lock for synchronization phase A', state='o') # Synchronize nodes B (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.writelock('/locks/primary_node') self.logger.out('Acquiring write lock for synchronization phase B', state='i') lock.acquire() self.logger.out('Acquired write lock for synchronization phase B', state='o') @@ -541,7 +558,9 @@ class NodeInstance(object): for network in self.d_network: self.d_network[network].stopDHCPServer() self.logger.out('Releasing write lock for synchronization phase B', state='i') - zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + self.zkhandler.write([ + ('/locks/primary_node', '') + ]) lock.release() self.logger.out('Released write lock for synchronization phase B', state='o') # 3. Stop client API @@ -553,7 +572,7 @@ class NodeInstance(object): time.sleep(0.1) # Time fir new writer to acquire the lock # Synchronize nodes C (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase C', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase C', state='o') @@ -572,7 +591,7 @@ class NodeInstance(object): self.logger.out('Released read lock for synchronization phase C', state='o') # Synchronize nodes D (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase D', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase D', state='o') @@ -600,7 +619,7 @@ class NodeInstance(object): self.logger.out('Released read lock for synchronization phase D', state='o') # Synchronize nodes E (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase E', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase E', state='o') @@ -619,7 +638,7 @@ class NodeInstance(object): self.logger.out('Released read lock for synchronization phase E', state='o') # Synchronize nodes F (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase F', state='i') lock.acquire() self.logger.out('Acquired read lock for synchronization phase F', state='o') @@ -631,7 +650,7 @@ class NodeInstance(object): self.logger.out('Released read lock for synchronization phase F', state='o') # Synchronize nodes G (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + lock = self.zkhandler.readlock('/locks/primary_node') self.logger.out('Acquiring read lock for synchronization phase G', state='i') try: lock.acquire(timeout=60) # Don't wait forever and completely block us @@ -644,7 +663,9 @@ class NodeInstance(object): # Wait 2 seconds for everything to stabilize before we declare all-done time.sleep(2) - zkhandler.writedata(self.zk_conn, {'/nodes/{}/routerstate'.format(self.name): 'secondary'}) + self.zkhandler.write([ + ('/nodes/{}/routerstate'.format(self.name), 'secondary') + ]) self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o') # Flush all VMs on the host @@ -664,38 +685,42 @@ class NodeInstance(object): self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i') # Don't replace the previous node if the VM is already migrated - if zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)): - current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) + if self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)): + current_node = self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) else: - current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid)) + current_node = self.zkhandler.read('/domains/{}/node'.format(dom_uuid)) - target_node = common.findTargetNode(self.zk_conn, self.config, self.logger, dom_uuid) + target_node = common.findTargetNode(self.zkhandler, self.config, self.logger, dom_uuid) if target_node == current_node: target_node = None if target_node is None: self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e') - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(dom_uuid): 'shutdown'}) - zkhandler.writedata(self.zk_conn, {'/domains/{}/node_autostart'.format(dom_uuid): 'True'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), 'shutdown') + ('/domains/{}/node_autostart'.format(dom_uuid), 'True') + ]) else: self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i') - zkhandler.writedata(self.zk_conn, { - '/domains/{}/state'.format(dom_uuid): 'migrate', - '/domains/{}/node'.format(dom_uuid): target_node, - '/domains/{}/lastnode'.format(dom_uuid): current_node - }) + self.zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), 'migrate'), + ('/domains/{}/node'.format(dom_uuid), target_node), + ('/domains/{}/lastnode'.format(dom_uuid), current_node) + ]) # Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways) ticks = 0 - while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']: + while self.zkhandler.read('/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']: ticks += 1 if ticks > 600: # Abort if we've waited for 120 seconds, the VM is messed and just continue break time.sleep(0.2) - zkhandler.writedata(self.zk_conn, {'/nodes/{}/runningdomains'.format(self.name): ''}) - zkhandler.writedata(self.zk_conn, {'/nodes/{}/domainstate'.format(self.name): 'flushed'}) + self.zkhandler.write([ + ('/nodes/{}/runningdomains'.format(self.name), ''), + ('/nodes/{}/domainstate'.format(self.name), '') + ]) self.flush_thread = None self.flush_stopper = False return @@ -712,20 +737,20 @@ class NodeInstance(object): return # Handle autostarts - autostart = zkhandler.readdata(self.zk_conn, '/domains/{}/node_autostart'.format(dom_uuid)) - node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid)) + autostart = self.zkhandler.read('/domains/{}/node_autostart'.format(dom_uuid)) + node = self.zkhandler.read('/domains/{}/node'.format(dom_uuid)) if autostart == 'True' and node == self.name: self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i') - zkhandler.writedata(self.zk_conn, { - '/domains/{}/state'.format(dom_uuid): 'start', - '/domains/{}/node'.format(dom_uuid): self.name, - '/domains/{}/lastnode'.format(dom_uuid): '', - '/domains/{}/node_autostart'.format(dom_uuid): 'False' - }) + self.zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), 'start'), + ('/domains/{}/node'.format(dom_uuid), self.name), + ('/domains/{}/lastnode'.format(dom_uuid), ''), + ('/domains/{}/node_autostart'.format(dom_uuid), 'False') + ]) continue try: - last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) + last_node = self.zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) except Exception: continue @@ -733,17 +758,19 @@ class NodeInstance(object): continue self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i') - zkhandler.writedata(self.zk_conn, { - '/domains/{}/state'.format(dom_uuid): 'migrate', - '/domains/{}/node'.format(dom_uuid): self.name, - '/domains/{}/lastnode'.format(dom_uuid): '' - }) + self.zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), 'migrate'), + ('/domains/{}/node'.format(dom_uuid), self.name), + ('/domains/{}/lastnode'.format(dom_uuid), '') + ]) # Wait for the VM to migrate back - while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']: + while self.zkhandler.read('/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']: time.sleep(0.1) - zkhandler.writedata(self.zk_conn, {'/nodes/{}/domainstate'.format(self.name): 'ready'}) + self.zkhandler.write([ + ('/nodes/{}/domainstate'.format(self.name), 'ready') + ]) self.flush_thread = None self.flush_stopper = False return