Move configuration keys to /config tree
This commit is contained in:
@ -529,6 +529,39 @@ except Exception as e:
|
||||
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
|
||||
exit(1)
|
||||
|
||||
# Create the /config key if it does not exist
|
||||
try:
|
||||
zkhandler.read('/config')
|
||||
except Exception:
|
||||
zkhandler.write([
|
||||
('/config', ''),
|
||||
('/config/primary_node', 'none'),
|
||||
('/config/upstream_ip', 'none'),
|
||||
('/config/maintenance', 'False'),
|
||||
])
|
||||
|
||||
# MIGRATION - populate the keys from their old values
|
||||
try:
|
||||
primary_node = zkhandler.read('/primary_node')
|
||||
zkhandler.write([
|
||||
('/config/primary_node', primary_node)
|
||||
])
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
upstream_ip = zkhandler.read('/upstream_ip')
|
||||
zkhandler.write([
|
||||
('/config/upstream_ip', upstream_ip)
|
||||
])
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
maintenance = zkhandler.read('/maintenance')
|
||||
zkhandler.write([
|
||||
('/config/maintenance', maintenance)
|
||||
])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
###############################################################################
|
||||
# PHASE 5 - Gracefully handle termination
|
||||
@ -566,7 +599,7 @@ def cleanup():
|
||||
try:
|
||||
if this_node.router_state == 'primary':
|
||||
zkhandler.write([
|
||||
('/primary_node', 'none')
|
||||
('/config/primary_node', 'none')
|
||||
])
|
||||
logger.out('Waiting for primary migration', state='s')
|
||||
while this_node.router_state != 'secondary':
|
||||
@ -673,7 +706,7 @@ else:
|
||||
|
||||
# Check that the primary key exists, and create it with us as master if not
|
||||
try:
|
||||
current_primary = zkhandler.read('/primary_node')
|
||||
current_primary = zkhandler.read('/config/primary_node')
|
||||
except kazoo.exceptions.NoNodeError:
|
||||
current_primary = 'none'
|
||||
|
||||
@ -683,7 +716,7 @@ else:
|
||||
if config['daemon_mode'] == 'coordinator':
|
||||
logger.out('No primary node found; creating with us as primary.', state='i')
|
||||
zkhandler.write([
|
||||
('/primary_node', myhostname)
|
||||
('/config/primary_node', myhostname)
|
||||
])
|
||||
|
||||
###############################################################################
|
||||
@ -819,7 +852,7 @@ this_node = d_node[myhostname]
|
||||
|
||||
|
||||
# Maintenance mode
|
||||
@zkhandler.zk_conn.DataWatch('/maintenance')
|
||||
@zkhandler.zk_conn.DataWatch('/config/maintenance')
|
||||
def set_maintenance(_maintenance, stat, event=''):
|
||||
global maintenance
|
||||
try:
|
||||
@ -829,7 +862,7 @@ def set_maintenance(_maintenance, stat, event=''):
|
||||
|
||||
|
||||
# Primary node
|
||||
@zkhandler.zk_conn.DataWatch('/primary_node')
|
||||
@zkhandler.zk_conn.DataWatch('/config/primary_node')
|
||||
def update_primary(new_primary, stat, event=''):
|
||||
try:
|
||||
new_primary = new_primary.decode('ascii')
|
||||
@ -844,7 +877,7 @@ def update_primary(new_primary, stat, event=''):
|
||||
if this_node.daemon_state == 'run' and this_node.router_state not in ['primary', 'takeover', 'relinquish']:
|
||||
logger.out('Contending for primary coordinator state', state='i')
|
||||
# Acquire an exclusive lock on the primary_node key
|
||||
primary_lock = zkhandler.exclusivelock('/primary_node')
|
||||
primary_lock = zkhandler.exclusivelock('/config/primary_node')
|
||||
try:
|
||||
# This lock times out after 0.4s, which is 0.1s less than the pre-takeover
|
||||
# timeout below, thus ensuring that a primary takeover will not deadlock
|
||||
@ -852,9 +885,9 @@ def update_primary(new_primary, stat, event=''):
|
||||
primary_lock.acquire(timeout=0.4)
|
||||
# Ensure when we get the lock that the versions are still consistent and that
|
||||
# another node hasn't already acquired primary state
|
||||
if key_version == zkhandler.zk_conn.get('/primary_node')[1].version:
|
||||
if key_version == zkhandler.zk_conn.get('/config/primary_node')[1].version:
|
||||
zkhandler.write([
|
||||
('/primary_node', myhostname)
|
||||
('/config/primary_node', myhostname)
|
||||
])
|
||||
# Cleanly release the lock
|
||||
primary_lock.release()
|
||||
@ -1475,11 +1508,11 @@ def node_keepalive():
|
||||
if config['enable_networking']:
|
||||
if this_node.router_state == 'primary':
|
||||
try:
|
||||
if zkhandler.read('/upstream_ip') != config['upstream_floating_ip']:
|
||||
if zkhandler.read('/config/upstream_ip') != config['upstream_floating_ip']:
|
||||
raise
|
||||
except Exception:
|
||||
zkhandler.write([
|
||||
('/upstream_ip', config['upstream_floating_ip'])
|
||||
('/config/upstream_ip', config['upstream_floating_ip'])
|
||||
])
|
||||
|
||||
# Get past state and update if needed
|
||||
@ -1498,9 +1531,9 @@ def node_keepalive():
|
||||
if debug:
|
||||
logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread')
|
||||
if this_node.router_state == 'primary':
|
||||
if zkhandler.read('/primary_node') != this_node.name:
|
||||
if zkhandler.read('/config/primary_node') != this_node.name:
|
||||
zkhandler.write([
|
||||
('/primary_node', this_node.name)
|
||||
('/config/primary_node', this_node.name)
|
||||
])
|
||||
|
||||
# Run VM statistics collection in separate thread for parallelization
|
||||
|
Reference in New Issue
Block a user