From f913f42a6d10d0f741ddc44fad1e502585848221 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 9 Jun 2021 13:28:31 -0400 Subject: [PATCH] Replace schema paths with updated zkhandler --- node-daemon/pvcnoded/Daemon.py | 211 ++++++++++++++++----------------- 1 file changed, 102 insertions(+), 109 deletions(-) diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index ddb08cec..e83f535f 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -39,7 +39,7 @@ from queue import Queue from xml.etree import ElementTree from rados import Rados -from daemon_lib.zkhandler import ZKHandler, ZKSchema +from daemon_lib.zkhandler import ZKHandler import pvcnoded.fencing as fencing import daemon_lib.log as log @@ -533,39 +533,36 @@ except Exception as e: logger.out('Validating Zookeeper schema', state='i') -# Instantiate a zkschema instance with our current schema version -zkschema = ZKSchema() - try: - node_schema_version = int(zkhandler.read(zkschema.path('node.data.active_schema', myhostname))) + node_schema_version = int(zkhandler.read(('node.data.active_schema', myhostname))) except Exception: - node_schema_version = zkhandler.read(zkschema.path('base.schema.version')) + node_schema_version = zkhandler.read('base.schema.version') zkhandler.write([ - (zkschema.path('node.data.active_schema', myhostname), node_schema_version) + (('node.data.active_schema', myhostname), node_schema_version) ]) # Load in the current node schema version -zkschema.load(node_schema_version) +zkhandler.schema.load(node_schema_version) # Record the latest intalled schema version -latest_schema_version = ZKSchema.find_latest() +latest_schema_version = zkhandler.schema.find_latest() zkhandler.write([ - (zkschema.path('node.data.latest_schema', myhostname), latest_schema_version) + (('node.data.latest_schema', myhostname), latest_schema_version) ]) # Validate our schema against that version -if not zkschema.validate(zkhandler, logger): +if not zkhandler.schema.validate(zkhandler, logger): logger.out('Found schema violations, applying', state='i') - zkschema.apply(zkhandler) + zkhandler.schema.apply(zkhandler) else: logger.out('Schema successfully validated', state='o') # Watch for a global schema update and fire # This will only change by the API when triggered after seeing all nodes can update -@zkhandler.zk_conn.DataWatch(zkschema.path('base.schema.version')) +@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.schema.version')) def update_schema(new_schema_version, stat, event=''): - global zkschema, zkhandler, update_timer, node_schema_version + global zkhandler, update_timer, node_schema_version new_schema_version = int(new_schema_version.decode('ascii')) @@ -580,34 +577,34 @@ def update_schema(new_schema_version, stat, event=''): time.sleep(1) # Perform the migration (primary only) - if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname: + if zkhandler.read('base.config.primary_node') == myhostname: logger.out('Primary node acquiring exclusive lock', state='s') # Wait for things to settle time.sleep(0.5) # Acquire a write lock on the root key - with zkhandler.exclusivelock('/'): + with zkhandler.exclusivelock('base.schema.version'): # Perform the schema migration tasks logger.out('Performing schema update', state='s') if new_schema_version > node_schema_version: - zkschema.migrate(zkhandler, new_schema_version) + zkhandler.schema.migrate(zkhandler, new_schema_version) if new_schema_version < node_schema_version: - zkschema.rollback(zkhandler, new_schema_version) + zkhandler.schema.rollback(zkhandler, new_schema_version) # Wait for the exclusive lock to be lifted else: logger.out('Non-primary node acquiring read lock', state='s') # Wait for things to settle time.sleep(1) # Wait for a read lock - lock = zkhandler.readlock('/') + lock = zkhandler.readlock('base.schema.version') lock.acquire() # Wait a bit more for the primary to return to normal time.sleep(1) # Update the local schema version logger.out('Updating local schema', state='s') - zkschema.load(new_schema_version) + zkhandler.schema.load(new_schema_version) zkhandler.write([ - (zkschema.path('node.data.active_schema', myhostname), new_schema_version) + (('node.data.active_schema', myhostname), new_schema_version) ]) node_schema_version = new_schema_version time.sleep(1) @@ -617,7 +614,7 @@ def update_schema(new_schema_version, stat, event=''): update_timer = startKeepaliveTimer() # Restart the API daemons if applicable - if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname: + if zkhandler.read('base.config.primary_node') == myhostname: common.run_os_command('systemctl start pvcapid.service') common.run_os_command('systemctl start pvcapid-worker.service') @@ -625,14 +622,14 @@ def update_schema(new_schema_version, stat, event=''): # If we are the last node to get a schema update, fire the master update if latest_schema_version > node_schema_version: node_latest_schema_version = list() - for node in zkhandler.children(zkschema.path('base.node')): - node_latest_schema_version.append(int(zkhandler.read(zkschema.path('node.data.latest_schema', node)))) + for node in zkhandler.children('base.node'): + node_latest_schema_version.append(int(zkhandler.read(('node.data.latest_schema', node)))) # This is true if all elements of the latest schema version are identical to the latest version, # i.e. they have all had the latest schema installed and ready to load. if node_latest_schema_version.count(latest_schema_version) == len(node_latest_schema_version): zkhandler.write([ - (zkschema.path('base.schema.version'), latest_schema_version) + ('base.schema.version', latest_schema_version) ]) ############################################################################### @@ -642,13 +639,13 @@ if latest_schema_version > node_schema_version: # Cleanup function def cleanup(): - global zkhandler, zkschema, update_timer, d_domain + global zkhandler, update_timer, d_domain logger.out('Terminating pvcnoded and cleaning up', state='s') # Set shutdown state in Zookeeper zkhandler.write([ - ('/nodes/{}/daemonstate'.format(myhostname), 'shutdown') + (('node.state.daemon', myhostname), 'shutdown') ]) # Waiting for any flushes to complete @@ -671,7 +668,7 @@ def cleanup(): try: if this_node.router_state == 'primary': zkhandler.write([ - ('/config/primary_node', 'none') + ('base.config.primary_node', 'none') ]) logger.out('Waiting for primary migration', state='s') while this_node.router_state != 'secondary': @@ -692,7 +689,7 @@ def cleanup(): # Set stop state in Zookeeper zkhandler.write([ - ('/nodes/{}/daemonstate'.format(myhostname), 'stop') + (('node.state.daemon', myhostname), 'stop') ]) # Forcibly terminate dnsmasq because it gets stuck sometimes @@ -700,7 +697,6 @@ def cleanup(): # Close the Zookeeper connection try: - del zkschema zkhandler.disconnect() del zkhandler except Exception: @@ -736,50 +732,47 @@ if config['daemon_mode'] == 'coordinator': init_routerstate = 'secondary' else: init_routerstate = 'client' -if zkhandler.exists('/nodes/{}'.format(myhostname)): +if zkhandler.exists(('node', myhostname)): logger.out("Node is " + fmt_green + "present" + fmt_end + " in Zookeeper", state='i') # Update static data just in case it's changed zkhandler.write([ - ('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode']), - ('/nodes/{}/daemonstate'.format(myhostname), 'init'), - ('/nodes/{}/routerstate'.format(myhostname), init_routerstate), - ('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata)), - # Keepalives and fencing information (always load and set from config on boot) - ('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname']), - ('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username']), - ('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password']) + (('node.mode', myhostname), config['daemon_mode']), + (('node.state.daemon', myhostname), 'init'), + (('node.state.router', myhostname), init_routerstate), + (('node.data.static', myhostname), ' '.join(staticdata)), + (('node.ipmi.hostname', myhostname), config['ipmi_hostname']), + (('node.ipmi.username', myhostname), config['ipmi_username']), + (('node.ipmi.password', myhostname), config['ipmi_password']), ]) else: logger.out("Node is " + fmt_red + "absent" + fmt_end + " in Zookeeper; adding new node", state='i') keepalive_time = int(time.time()) zkhandler.write([ - ('/nodes/{}'.format(myhostname), config['daemon_mode']), - # Basic state information - ('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode']), - ('/nodes/{}/daemonstate'.format(myhostname), 'init'), - ('/nodes/{}/routerstate'.format(myhostname), init_routerstate), - ('/nodes/{}/domainstate'.format(myhostname), 'flushed'), - ('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata)), - ('/nodes/{}/memtotal'.format(myhostname), '0'), - ('/nodes/{}/memfree'.format(myhostname), '0'), - ('/nodes/{}/memused'.format(myhostname), '0'), - ('/nodes/{}/memalloc'.format(myhostname), '0'), - ('/nodes/{}/memprov'.format(myhostname), '0'), - ('/nodes/{}/vcpualloc'.format(myhostname), '0'), - ('/nodes/{}/cpuload'.format(myhostname), '0.0'), - ('/nodes/{}/networkscount'.format(myhostname), '0'), - ('/nodes/{}/domainscount'.format(myhostname), '0'), - ('/nodes/{}/runningdomains'.format(myhostname), ''), - # Keepalives and fencing information - ('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time)), - ('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname']), - ('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username']), - ('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password']) + (('node', myhostname), config['daemon_mode']), + (('node.keepalive', myhostname), str(keepalive_time)), + (('node.mode', myhostname), config['daemon_mode']), + (('node.state.daemon', myhostname), 'init'), + (('node.state.domain', myhostname), 'flushed'), + (('node.state.router', myhostname), init_routerstate), + (('node.data.static', myhostname), ' '.join(staticdata)), + (('node.ipmi.hostname', myhostname), config['ipmi_hostname']), + (('node.ipmi.username', myhostname), config['ipmi_username']), + (('node.ipmi.password', myhostname), config['ipmi_password']), + (('node.memory.total', myhostname), '0'), + (('node.memory.used', myhostname), '0'), + (('node.memory.free', myhostname), '0'), + (('node.memory.allocated', myhostname), '0'), + (('node.memory.provisioned', myhostname), '0'), + (('node.vcpu.allocated', myhostname), '0'), + (('node.cpu.load', myhostname), '0.0'), + (('node.running_domains', myhostname), '0'), + (('node.count.provisioned_domains', myhostname), '0'), + (('node.count.networks', myhostname), '0'), ]) # Check that the primary key exists, and create it with us as master if not try: - current_primary = zkhandler.read('/config/primary_node') + current_primary = zkhandler.read('base.config.primary_node') except kazoo.exceptions.NoNodeError: current_primary = 'none' @@ -787,9 +780,9 @@ if current_primary and current_primary != 'none': logger.out('Current primary node is {}{}{}.'.format(fmt_blue, current_primary, fmt_end), state='i') else: if config['daemon_mode'] == 'coordinator': - logger.out('No primary node found; creating with us as primary.', state='i') + logger.out('No primary node found; setting us as primary.', state='i') zkhandler.write([ - ('/config/primary_node', myhostname) + ('base.config.primary_node', myhostname) ]) ############################################################################### @@ -896,7 +889,7 @@ else: # Node objects -@zkhandler.zk_conn.ChildrenWatch('/nodes') +@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.node')) def update_nodes(new_node_list): global node_list, d_node @@ -925,7 +918,7 @@ this_node = d_node[myhostname] # Maintenance mode -@zkhandler.zk_conn.DataWatch('/config/maintenance') +@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.config.maintenance')) def set_maintenance(_maintenance, stat, event=''): global maintenance try: @@ -935,7 +928,7 @@ def set_maintenance(_maintenance, stat, event=''): # Primary node -@zkhandler.zk_conn.DataWatch('/config/primary_node') +@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.config.primary_node')) def update_primary(new_primary, stat, event=''): try: new_primary = new_primary.decode('ascii') @@ -950,7 +943,7 @@ def update_primary(new_primary, stat, event=''): if this_node.daemon_state == 'run' and this_node.router_state not in ['primary', 'takeover', 'relinquish']: logger.out('Contending for primary coordinator state', state='i') # Acquire an exclusive lock on the primary_node key - primary_lock = zkhandler.exclusivelock('/config/primary_node') + primary_lock = zkhandler.exclusivelock('base.config.primary_node') try: # This lock times out after 0.4s, which is 0.1s less than the pre-takeover # timeout below, thus ensuring that a primary takeover will not deadlock @@ -958,9 +951,9 @@ def update_primary(new_primary, stat, event=''): primary_lock.acquire(timeout=0.4) # Ensure when we get the lock that the versions are still consistent and that # another node hasn't already acquired primary state - if key_version == zkhandler.zk_conn.get('/config/primary_node')[1].version: + if key_version == zkhandler.zk_conn.get(zkhandler.schema.path('base.config.primary_node'))[1].version: zkhandler.write([ - ('/config/primary_node', myhostname) + ('base.config.primary_node', myhostname) ]) # Cleanly release the lock primary_lock.release() @@ -971,17 +964,17 @@ def update_primary(new_primary, stat, event=''): if this_node.router_state == 'secondary': time.sleep(0.5) zkhandler.write([ - ('/nodes/{}/routerstate'.format(myhostname), 'takeover') + (('node.state.router', myhostname), 'takeover') ]) else: if this_node.router_state == 'primary': time.sleep(0.5) zkhandler.write([ - ('/nodes/{}/routerstate'.format(myhostname), 'relinquish') + (('node.state.router', myhostname), 'relinquish') ]) else: zkhandler.write([ - ('/nodes/{}/routerstate'.format(myhostname), 'client') + (('node.state.router', myhostname), 'client') ]) for node in d_node: @@ -990,7 +983,7 @@ def update_primary(new_primary, stat, event=''): if enable_networking: # Network objects - @zkhandler.zk_conn.ChildrenWatch('/networks') + @zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.network')) def update_networks(new_network_list): global network_list, d_network @@ -1033,13 +1026,13 @@ if enable_networking: if enable_hypervisor: # VM command pipeline key - @zkhandler.zk_conn.DataWatch('/cmd/domains') + @zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.cmd.domain')) def cmd_domains(data, stat, event=''): if data: VMInstance.run_command(zkhandler, logger, this_node, data.decode('ascii')) # VM domain objects - @zkhandler.zk_conn.ChildrenWatch('/domains') + @zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.domain')) def update_domains(new_domain_list): global domain_list, d_domain @@ -1064,13 +1057,13 @@ if enable_hypervisor: if enable_storage: # Ceph command pipeline key - @zkhandler.zk_conn.DataWatch('/cmd/ceph') + @zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.cmd.ceph')) def cmd_ceph(data, stat, event=''): if data: CephInstance.run_command(zkhandler, logger, this_node, data.decode('ascii'), d_osd) # OSD objects - @zkhandler.zk_conn.ChildrenWatch('/ceph/osds') + @zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.osd')) def update_osds(new_osd_list): global osd_list, d_osd @@ -1090,7 +1083,7 @@ if enable_storage: logger.out('{}OSD list:{} {}'.format(fmt_blue, fmt_end, ' '.join(osd_list)), state='i') # Pool objects - @zkhandler.zk_conn.ChildrenWatch('/ceph/pools') + @zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.pool')) def update_pools(new_pool_list): global pool_list, d_pool @@ -1113,7 +1106,7 @@ if enable_storage: # Volume objects in each pool for pool in pool_list: - @zkhandler.zk_conn.ChildrenWatch('/ceph/volumes/{}'.format(pool)) + @zkhandler.zk_conn.childrenWatch(zkhandler.schema.path('volume', pool)) def update_volumes(new_volume_list): global volume_list, d_volume @@ -1180,7 +1173,7 @@ def collect_ceph_stats(queue): ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: zkhandler.write([ - ('/ceph', str(ceph_status)) + ('base.storage', str(ceph_status)) ]) except Exception as e: logger.out('Failed to set Ceph status data: {}'.format(e), state='e') @@ -1194,7 +1187,7 @@ def collect_ceph_stats(queue): ceph_df = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: zkhandler.write([ - ('/ceph/util', str(ceph_df)) + ('base.storage.util', str(ceph_df)) ]) except Exception as e: logger.out('Failed to set Ceph utilization data: {}'.format(e), state='e') @@ -1259,7 +1252,7 @@ def collect_ceph_stats(queue): # Write the pool data to Zookeeper zkhandler.write([ - ('/ceph/pools/{}/stats'.format(pool['name']), str(json.dumps(pool_df))) + (('pool.stats', pool['name']), str(json.dumps(pool_df))) ]) except Exception as e: # One or more of the status commands timed out, just continue @@ -1395,7 +1388,7 @@ def collect_ceph_stats(queue): try: stats = json.dumps(osd_stats[osd]) zkhandler.write([ - ('/ceph/osds/{}/stats'.format(osd), str(stats)) + (('osd.stats', osd), str(stats)) ]) except KeyError as e: # One or more of the status commands timed out, just continue @@ -1462,7 +1455,7 @@ def collect_vm_stats(queue): # Toggle a state "change" logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread') zkhandler.write([ - ('/domains/{}/state'.format(domain), instance.getstate()) + (('domain.state', domain), instance.getstate()) ]) elif instance.getnode() == this_node.name: memprov += instance.getmemory() @@ -1554,7 +1547,7 @@ def collect_vm_stats(queue): try: zkhandler.write([ - ("/domains/{}/stats".format(domain_uuid), str(json.dumps(domain_stats))) + (('domain.stats', domain_uuid), str(json.dumps(domain_stats))) ]) except Exception as e: if debug: @@ -1581,32 +1574,32 @@ def node_keepalive(): if config['enable_hypervisor']: if this_node.router_state == 'primary': try: - if zkhandler.read('/config/migration_target_selector') != config['migration_target_selector']: + if zkhandler.read('base.config.migration_target_selector') != config['migration_target_selector']: raise except Exception: zkhandler.write([ - ('/config/migration_target_selector', config['migration_target_selector']) + ('base.config.migration_target_selector', config['migration_target_selector']) ]) # Set the upstream IP in Zookeeper for clients to read if config['enable_networking']: if this_node.router_state == 'primary': try: - if zkhandler.read('/config/upstream_ip') != config['upstream_floating_ip']: + if zkhandler.read('base.config.upstream_ip') != config['upstream_floating_ip']: raise except Exception: zkhandler.write([ - ('/config/upstream_ip', config['upstream_floating_ip']) + ('base.config.upstream_ip', config['upstream_floating_ip']) ]) # Get past state and update if needed if debug: logger.out("Get past state and update if needed", state='d', prefix='main-thread') - past_state = zkhandler.read('/nodes/{}/daemonstate'.format(this_node.name)) + past_state = zkhandler.read(('node.state.daemon', this_node.name)) if past_state != 'run': this_node.daemon_state = 'run' zkhandler.write([ - ('/nodes/{}/daemonstate'.format(this_node.name), 'run') + (('node.state.daemon', this_node.name), 'run') ]) else: this_node.daemon_state = 'run' @@ -1615,9 +1608,9 @@ def node_keepalive(): if debug: logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread') if this_node.router_state == 'primary': - if zkhandler.read('/config/primary_node') != this_node.name: + if zkhandler.read('base.config.primary_node') != this_node.name: zkhandler.write([ - ('/config/primary_node', this_node.name) + ('base.config.primary_node', this_node.name) ]) # Run VM statistics collection in separate thread for parallelization @@ -1679,16 +1672,16 @@ def node_keepalive(): logger.out("Set our information in zookeeper", state='d', prefix='main-thread') try: zkhandler.write([ - ('/nodes/{}/memtotal'.format(this_node.name), str(this_node.memtotal)), - ('/nodes/{}/memused'.format(this_node.name), str(this_node.memused)), - ('/nodes/{}/memfree'.format(this_node.name), str(this_node.memfree)), - ('/nodes/{}/memalloc'.format(this_node.name), str(this_node.memalloc)), - ('/nodes/{}/memprov'.format(this_node.name), str(this_node.memprov)), - ('/nodes/{}/vcpualloc'.format(this_node.name), str(this_node.vcpualloc)), - ('/nodes/{}/cpuload'.format(this_node.name), str(this_node.cpuload)), - ('/nodes/{}/domainscount'.format(this_node.name), str(this_node.domains_count)), - ('/nodes/{}/runningdomains'.format(this_node.name), ' '.join(this_node.domain_list)), - ('/nodes/{}/keepalive'.format(this_node.name), str(keepalive_time)) + (('node.memory.total', this_node.name), str(this_node.memtotal)), + (('node.memory.used', this_node.name), str(this_node.memused)), + (('node.memory.free', this_node.name), str(this_node.memfree)), + (('node.memory.allocated', this_node.name), str(this_node.memalloc)), + (('node.memory.provisioned', this_node.name), str(this_node.memprov)), + (('node.vcpu.allocated', this_node.name), str(this_node.vcpualloc)), + (('node.cpu.load', this_node.name), str(this_node.cpuload)), + (('node.count.provisioned_domains', this_node.name), str(this_node.domains_count)), + (('node.running_domains', this_node.name), ' '.join(this_node.domain_list)), + (('node.keepalive', this_node.name), str(keepalive_time)), ]) except Exception: logger.out('Failed to set keepalive data', state='e') @@ -1758,8 +1751,8 @@ def node_keepalive(): if config['daemon_mode'] == 'coordinator': for node_name in d_node: try: - node_daemon_state = zkhandler.read('/nodes/{}/daemonstate'.format(node_name)) - node_keepalive = int(zkhandler.read('/nodes/{}/keepalive'.format(node_name))) + node_daemon_state = zkhandler.read(('node.state.daemon', node_name)) + node_keepalive = int(zkhandler.read(('node.keepalive', node_name))) except Exception: node_daemon_state = 'unknown' node_keepalive = 0 @@ -1770,16 +1763,16 @@ def node_keepalive(): node_deadtime = int(time.time()) - (int(config['keepalive_interval']) * int(config['fence_intervals'])) if node_keepalive < node_deadtime and node_daemon_state == 'run': logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') - zk_lock = zkhandler.writelock('/nodes/{}/daemonstate'.format(node_name)) + zk_lock = zkhandler.writelock(('node.state.daemon', node_name)) with zk_lock: # Ensures that, if we lost the lock race and come out of waiting, # we won't try to trigger our own fence thread. - if zkhandler.read('/nodes/{}/daemonstate'.format(node_name)) != 'dead': + if zkhandler.read(('node.state.daemon', node_name)) != 'dead': fence_thread = Thread(target=fencing.fenceNode, args=(node_name, zkhandler, config, logger), kwargs={}) fence_thread.start() # Write the updated data after we start the fence thread zkhandler.write([ - ('/nodes/{}/daemonstate'.format(node_name), 'dead') + (('node.state.daemon', node_name), 'dead') ]) if debug: