diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 3b6b0867..15fdc028 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -118,9 +118,21 @@ class NodeInstance(object): self.router_state = data if self.config['enable_networking']: if self.router_state == 'primary': - self.become_primary() + # Skip becoming primary unless already running + if self.daemon_state == 'run': + self.logger.out('Setting node {} to primary state'.format(self.name), state='i') + #self.become_primary() + transition_thread = threading.Thread(target=self.become_primary, args=(), kwargs={}) + transition_thread.start() + #transition_thread.join() else: - self.become_secondary() + # Skip becoming secondary unless already running + if self.daemon_state == 'run': + self.logger.out('Setting node {} to secondary state'.format(self.name), state='i') + #self.become_secondary() + transition_thread = threading.Thread(target=self.become_secondary, args=(), kwargs={}) + transition_thread.start() + #transition_thread.join() @self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name)) def watch_node_domainstate(data, stat, event=''): @@ -259,118 +271,89 @@ class NodeInstance(object): def update_domain_list(self, d_domain): self.d_domain = d_domain - # Routing primary/secondary states - def become_secondary(self): - if self.daemon_state == 'init': - return - - self.logger.out('Setting router {} to secondary state'.format(self.name), state='i') - self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i') - - time.sleep(1) - - if self.config['enable_api']: - self.logger.out('Stopping PVC API client service', state='i') - common.run_os_command("systemctl stop pvc-api.service") - for network in self.d_network: - self.d_network[network].stopDHCPServer() - self.d_network[network].removeGateways() - self.dns_aggregator.stop_aggregator() - self.metadata_api.stop() - self.removeFloatingAddresses() - + ###### + # Phases of node transition + # + # Current Primary Candidate Secondary + # -> secondary -> primary + # + # def become_secondary() def become_primary() + # + # A ----------------------------------------------------------------- SYNC (candidate) + # B ----------------------------------------------------------------- SYNC (current) + # 1. Stop client API || + # 2. Stop metadata API || + # 3. Stop DNS aggregator || + # 4. Stop DHCP servers || + # 4a) network 1 || + # 4b) network 2 || + # etc. || + # -- + # C ----------------------------------------------------------------- SYNC (candidate) + # 5. Remove upstream floating IP 1. Add upstream floating IP || + # -- + # D ----------------------------------------------------------------- SYNC (candidate) + # 6. Remove cluster floating IP 2. Add cluster floating IP || + # -- + # E ----------------------------------------------------------------- SYNC (candidate) + # 7. Remove metadata floating IP 3. Add metadata floating IP || + # -- + # F ----------------------------------------------------------------- SYNC (candidate) + # 8. Remove gateway IPs 4. Add gateway IPs || + # 8a) network 1 4a) network 1 || + # 8b) network 2 4b) network 2 || + # etc. etc. || + # -- + # G ----------------------------------------------------------------- SYNC (candidate) + # 5. Transition Patroni primary || + # 6. Start DHCP servers || + # 5a) network 1 || + # 5b) network 2 || + # etc. || + # 7. Start DNS aggregator || + # 8. Start metadata API || + # 9. Start client API || + # -- + ###### def become_primary(self): - # Establish a lock - with zkhandler.writelock(self.zk_conn, '/primary_node'): - self.logger.out('Setting router {} to primary state'.format(self.name), state='i') - self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i') + """ + Acquire primary coordinator status from a peer node + """ + # Lock the primary node until transition is complete + primary_lock = zkhandler.writelock(self.zk_conn, '/primary_node') + primary_lock.acquire() - # Create floating addresses - self.createFloatingAddresses() - # Start up the gateways and DHCP servers - for network in self.d_network: - self.d_network[network].createGateways() - self.d_network[network].startDHCPServer() + # Ensure our lock key is populated + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) - time.sleep(1) + # Synchronize nodes A (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization A', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization A', state='o') + time.sleep(1) # Time for reader to acquire the lock + self.logger.out('Releasing write lock for synchronization A', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization A', state='o') + time.sleep(0.1) # Time for new writer to acquire the lock - self.logger.out('Setting Patroni leader to this node', state='i') - tick = 1 - # As long as we're primary, keep trying to set the Patroni leader to us - while self.router_state == 'primary': - # Switch Patroni leader to the local instance - retcode, stdout, stderr = common.run_os_command( - """ - patronictl - -c /etc/patroni/config.yml - -d zookeeper://localhost:2181 - switchover - --candidate {} - --force - pvcdns - """.format(self.name) - ) + # Synchronize nodes B (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization B', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization B', state='o') + self.logger.out('Releasing read lock for synchronization B', state='i') + lock.release() + self.logger.out('Released read lock for synchronization B', state='o') - # Combine the stdout and stderr and strip the output - # Patronictl's output is pretty junky - if stderr: - stdout += stderr - stdout = stdout.strip() - - # Handle our current Patroni leader being us - if stdout and stdout.split('\n')[-1].split() == ["Error:", "Switchover", "target", "and", "source", "are", "the", "same."]: - self.logger.out('Failed to switch Patroni leader to ourselves; this is fine\n{}'.format(stdout), state='w') - break - # Handle a failed switchover - elif stdout and (stdout.split('\n')[-1].split()[:2] == ["Switchover", "failed,"] or stdout.strip().split('\n')[-1].split()[:1] == ["Error"]): - self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}\n'.format(tick, stdout, state='e')) - tick += 1 - if tick > 5: - self.logger.out('Failed to switch Patroni leader after 5 tries; aborting', state='e') - break - else: - time.sleep(5) - # Otherwise, we succeeded - else: - self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o') - break - - # Start the DNS aggregator instance - time.sleep(1) - self.dns_aggregator.start_aggregator() - self.metadata_api.start() - - # Start the clients - if self.config['enable_api']: - self.logger.out('Starting PVC API client service', state='i') - common.run_os_command("systemctl start pvc-api.service") - self.logger.out('Starting PVC Provisioner Worker service', state='i') - common.run_os_command("systemctl start pvc-provisioner-worker.service") - - def createFloatingAddresses(self): - # Metadata link-local IP - self.logger.out( - 'Creating Metadata link-local IP {}/{} on interface {}'.format( - '169.254.169.254', - '32', - 'lo' - ), - state='o' - ) - common.createIPAddress('169.254.169.254', '32', 'lo') - - # VNI floating IP - self.logger.out( - 'Creating floating management IP {}/{} on interface {}'.format( - self.vni_ipaddr, - self.vni_cidrnetmask, - 'brcluster' - ), - state='o' - ) - common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') - - # Upstream floating IP + # Synchronize nodes C (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization C', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization C', state='o') + time.sleep(0.5) # Time for reader to acquire the lock + # 1. Add Upstream floating IP self.logger.out( 'Creating floating upstream IP {}/{} on interface {}'.format( self.upstream_ipaddr, @@ -380,31 +363,180 @@ class NodeInstance(object): state='o' ) common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev) + self.logger.out('Releasing write lock for synchronization C', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization C', state='o') - def removeFloatingAddresses(self): - # Metadata link-local IP + # Synchronize nodes D (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization D', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization D', state='o') + time.sleep(0.2) # Time for reader to acquire the lock + # 2. Add Cluster floating IP self.logger.out( - 'Removing Metadata link-local IP {}/{} from interface {}'.format( - '169.254.169.254', - '32', - 'lo' - ), - state='o' - ) - common.removeIPAddress('169.254.169.254', '32', 'lo') - - # VNI floating IP - self.logger.out( - 'Removing floating management IP {}/{} from interface {}'.format( + 'Creating floating management IP {}/{} on interface {}'.format( self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster' ), state='o' ) - common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') + common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') + self.logger.out('Releasing write lock for synchronization D', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization D', state='o') - # Upstream floating IP + # Synchronize nodes E (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization E', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization E', state='o') + time.sleep(0.2) # Time for reader to acquire the lock + # 3. Add Metadata link-local IP + self.logger.out( + 'Creating Metadata link-local IP {}/{} on interface {}'.format( + '169.254.169.254', + '32', + 'lo' + ), + state='o' + ) + common.createIPAddress('169.254.169.254', '32', 'lo') + self.logger.out('Releasing write lock for synchronization E', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization E', state='o') + + # Synchronize nodes F (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization F', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization F', state='o') + time.sleep(0.2) # Time for reader to acquire the lock + # 4. Add gateway IPs + for network in self.d_network: + self.d_network[network].createGateways() + self.logger.out('Releasing write lock for synchronization F', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization F', state='o') + + # Synchronize nodes G (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization G', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization G', state='o') + time.sleep(0.2) # Time for reader to acquire the lock + # 5. Transition Patroni primary + self.logger.out('Setting Patroni leader to this node', state='i') + tick = 1 + # As long as we're primary, keep trying to set the Patroni leader to us + while self.router_state == 'primary': + # Switch Patroni leader to the local instance + retcode, stdout, stderr = common.run_os_command( + """ + patronictl + -c /etc/patroni/config.yml + -d zookeeper://localhost:2181 + switchover + --candidate {} + --force + pvcdns + """.format(self.name) + ) + + # Combine the stdout and stderr and strip the output + # Patronictl's output is pretty junky + if stderr: + stdout += stderr + stdout = stdout.strip() + + # Handle our current Patroni leader being us + if stdout and stdout.split('\n')[-1].split() == ["Error:", "Switchover", "target", "and", "source", "are", "the", "same."]: + self.logger.out('Failed to switch Patroni leader to ourselves; this is fine\n{}'.format(stdout), state='w') + break + # Handle a failed switchover + elif stdout and (stdout.split('\n')[-1].split()[:2] == ["Switchover", "failed,"] or stdout.strip().split('\n')[-1].split()[:1] == ["Error"]): + if tick > 4: + self.logger.out('Failed to switch Patroni leader after 5 tries; aborting', state='e') + break + else: + self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}\n'.format(tick, stdout), state='e') + tick += 1 + time.sleep(5) + # Otherwise, we succeeded + else: + self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o') + time.sleep(0.2) + break + # 6. Start DHCP servers + for network in self.d_network: + self.d_network[network].startDHCPServer() + # 7. Start DNS aggregator + self.dns_aggregator.start_aggregator() + # 8. Start metadata API + self.metadata_api.start() + # 9. Start client API (and provisioner worker) + if self.config['enable_api']: + self.logger.out('Stopping PVC API client service', state='i') + common.run_os_command("systemctl start pvc-api.service") + self.logger.out('Starting PVC Provisioner Worker service', state='i') + common.run_os_command("systemctl start pvc-provisioner-worker.service") + self.logger.out('Releasing write lock for synchronization G', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization G', state='o') + + primary_lock.release() + self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o') + + def become_secondary(self): + """ + Relinquish primary coordinator status to a peer node + """ + time.sleep(0.2) # Initial delay for the first writer to grab the lock + + # Synchronize nodes A (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization A', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization A', state='o') + self.logger.out('Releasing read lock for synchronization A', state='i') + lock.release() + self.logger.out('Released read lock for synchronization A', state='o') + + # Synchronize nodes B (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring write lock for synchronization B', state='i') + lock.acquire() + self.logger.out('Acquired write lock for synchronization B', state='o') + time.sleep(0.2) # Time for reader to acquire the lock + # 1. Stop client API + if self.config['enable_api']: + self.logger.out('Stopping PVC API client service', state='i') + common.run_os_command("systemctl stop pvc-api.service") + # 2. Stop metadata API + self.metadata_api.stop() + # 3. Stop DNS aggregator + self.dns_aggregator.stop_aggregator() + # 4. Stop DHCP servers + for network in self.d_network: + self.d_network[network].stopDHCPServer() + self.logger.out('Releasing write lock for synchronization B', state='i') + zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''}) + lock.release() + self.logger.out('Released write lock for synchronization B', state='o') + time.sleep(0.1) # Time for new writer to acquire the lock + + # Synchronize nodes C (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization C', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization C', state='o') + # 5. Remove Upstream floating IP self.logger.out( 'Removing floating upstream IP {}/{} from interface {}'.format( self.upstream_ipaddr, @@ -414,6 +546,70 @@ class NodeInstance(object): state='o' ) common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev) + self.logger.out('Releasing read lock for synchronization C', state='i') + lock.release() + self.logger.out('Released read lock for synchronization C', state='o') + + # Synchronize nodes D (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization D', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization D', state='o') + # 6. Remove Cluster floating IP + self.logger.out( + 'Removing floating management IP {}/{} from interface {}'.format( + self.vni_ipaddr, + self.vni_cidrnetmask, + 'brcluster' + ), + state='o' + ) + common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster') + self.logger.out('Releasing read lock for synchronization D', state='i') + lock.release() + self.logger.out('Released read lock for synchronization D', state='o') + + # Synchronize nodes E (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization E', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization E', state='o') + # 7. Remove Metadata link-local IP + self.logger.out( + 'Removing Metadata link-local IP {}/{} from interface {}'.format( + '169.254.169.254', + '32', + 'lo' + ), + state='o' + ) + common.removeIPAddress('169.254.169.254', '32', 'lo') + self.logger.out('Releasing read lock for synchronization E', state='i') + lock.release() + self.logger.out('Released read lock for synchronization E', state='o') + + # Synchronize nodes F (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization F', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization F', state='o') + # 8. Remove gateway IPs + for network in self.d_network: + self.d_network[network].removeGateways() + self.logger.out('Releasing read lock for synchronization F', state='i') + lock.release() + self.logger.out('Released read lock for synchronization F', state='o') + + # Synchronize nodes G (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node') + self.logger.out('Acquiring read lock for synchronization G', state='i') + lock.acquire() + self.logger.out('Acquired read lock for synchronization G', state='o') + self.logger.out('Releasing read lock for synchronization G', state='i') + lock.release() + self.logger.out('Released read lock for synchronization G', state='o') + + self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o') # Flush all VMs on the host def flush(self):