diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 39ae7410..03fd7869 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -44,6 +44,8 @@ import apscheduler.schedulers.background from distutils.util import strtobool +from queue import Queue + import pvcnoded.log as log import pvcnoded.zkhandler as zkhandler import pvcnoded.fencing as fencing @@ -1011,6 +1013,236 @@ if enable_storage: # PHASE 9 - Run the daemon ############################################################################### +# Ceph stats update function +def collect_ceph_stats(queue): + # Get Ceph cluster health (for local printing) + if debug: + print("Get Ceph cluster health (for local printing)") + retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1) + ceph_health = stdout.rstrip() + if 'HEALTH_OK' in ceph_health: + ceph_health_colour = fmt_green + elif 'HEALTH_WARN' in ceph_health: + ceph_health_colour = fmt_yellow + else: + ceph_health_colour = fmt_red + + # Set ceph health information in zookeeper (primary only) + if this_node.router_state == 'primary': + if debug: + print("Set ceph health information in zookeeper (primary only)") + # Get status info + retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1) + ceph_status = stdout + try: + zkhandler.writedata(zk_conn, { + '/ceph': str(ceph_status) + }) + except: + logger.out('Failed to set Ceph status data', state='e') + return + + # Set ceph rados df information in zookeeper (primary only) + if this_node.router_state == 'primary': + if debug: + print("Set ceph rados df information in zookeeper (primary only)") + # Get rados df info + retcode, stdout, stderr = common.run_os_command('rados df', timeout=1) + rados_df = stdout + try: + zkhandler.writedata(zk_conn, { + '/ceph/radosdf': str(rados_df) + }) + except: + logger.out('Failed to set Rados space data', state='e') + return + + # Set pool information in zookeeper (primary only) + if this_node.router_state == 'primary': + if debug: + print("Set pool information in zookeeper (primary only)") + + # Get pool info + retcode, stdout, stderr = common.run_os_command('ceph df --format json', timeout=1) + try: + ceph_pool_df_raw = json.loads(stdout)['pools'] + except json.decoder.JSONDecodeError: + logger.out('Failed to obtain Pool data (ceph df)', state='w') + ceph_pool_df_raw = [] + + retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1) + try: + rados_pool_df_raw = json.loads(stdout)['pools'] + except json.decoder.JSONDecodeError: + logger.out('Failed to obtain Pool data (rados df)', state='w') + rados_pool_df_raw = [] + + pool_count = len(ceph_pool_df_raw) + if debug: + print("Getting info for {} pools".format(pool_count)) + for pool_idx in range(0, pool_count): + try: + # Combine all the data for this pool + ceph_pool_df = ceph_pool_df_raw[pool_idx] + rados_pool_df = rados_pool_df_raw[pool_idx] + pool = ceph_pool_df + pool.update(rados_pool_df) + + # Ignore any pools that aren't in our pool list + if pool['name'] not in pool_list: + if debug: + print("Pool {} not in pool list {}".format(pool['name'], pool_list)) + continue + else: + if debug: + print("Parsing data for pool {}".format(pool['name'])) + + # Assemble a useful data structure + pool_df = { + 'id': pool['id'], + 'free_bytes': pool['stats']['max_avail'], + 'used_bytes': pool['stats']['bytes_used'], + 'used_percent': pool['stats']['percent_used'], + 'num_objects': pool['stats']['objects'], + 'num_object_clones': pool['num_object_clones'], + 'num_object_copies': pool['num_object_copies'], + 'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'], + 'num_objects_unfound': pool['num_objects_unfound'], + 'num_objects_degraded': pool['num_objects_degraded'], + 'read_ops': pool['read_ops'], + 'read_bytes': pool['read_bytes'], + 'write_ops': pool['write_ops'], + 'write_bytes': pool['write_bytes'] + } + + # Write the pool data to Zookeeper + zkhandler.writedata(zk_conn, { + '/ceph/pools/{}/stats'.format(pool['name']): str(json.dumps(pool_df)) + }) + except Exception as e: + # One or more of the status commands timed out, just continue + logger.out('Failed to format and send pool data', state='w') + pass + + # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs) + osds_this_node = 0 + if len(osd_list) > 0: + # Get data from Ceph OSDs + if debug: + print("Get data from Ceph OSDs") + # Parse the dump data + osd_dump = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1) + try: + osd_dump_raw = json.loads(stdout)['osds'] + except json.decoder.JSONDecodeError: + logger.out('Failed to obtain OSD data', state='w') + osd_dump_raw = [] + + if debug: + print("Loop through OSD dump") + for osd in osd_dump_raw: + osd_dump.update({ + str(osd['osd']): { + 'uuid': osd['uuid'], + 'up': osd['up'], + 'in': osd['in'], + 'primary_affinity': osd['primary_affinity'] + } + }) + + # Parse the df data + if debug: + print("Parse the OSD df data") + osd_df = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1) + try: + osd_df_raw = json.loads(stdout)['nodes'] + except Exception as e: + logger.out('Failed to parse OSD list: {}'.format(e), state='w') + osd_df_raw = [] + + if debug: + print("Loop through OSD df") + for osd in osd_df_raw: + osd_df.update({ + str(osd['id']): { + 'utilization': osd['utilization'], + 'var': osd['var'], + 'pgs': osd['pgs'], + 'kb': osd['kb'], + 'weight': osd['crush_weight'], + 'reweight': osd['reweight'], + } + }) + # Parse the status data + if debug: + print("Parse the OSD status data") + osd_status = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1) + if debug: + print("Loop through OSD status data") + for line in stdout.split('\n'): + # Strip off colour + line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) + # Split it for parsing + line = line.split() + if len(line) > 1 and line[1].isdigit(): + # This is an OSD line so parse it + osd_id = line[1] + node = line[3].split('.')[0] + used = line[5] + avail = line[7] + wr_ops = line[9] + wr_data = line[11] + rd_ops = line[13] + rd_data = line[15] + state = line[17] + osd_status.update({ + str(osd_id): { + 'node': node, + 'used': used, + 'avail': avail, + 'wr_ops': wr_ops, + 'wr_data': wr_data, + 'rd_ops': rd_ops, + 'rd_data': rd_data, + 'state': state + } + }) + # Merge them together into a single meaningful dict + if debug: + print("Merge OSD data together") + osd_stats = dict() + for osd in osd_list: + try: + this_dump = osd_dump[osd] + this_dump.update(osd_df[osd]) + this_dump.update(osd_status[osd]) + osd_stats[osd] = this_dump + except KeyError as e: + # One or more of the status commands timed out, just continue + logger.out('Failed to parse OSD stats into dictionary: {}'.format(e), state='w') + + # Trigger updates for each OSD on this node + if debug: + print("Trigger updates for each OSD on this node") + for osd in osd_list: + if d_osd[osd].node == myhostname: + try: + stats = json.dumps(osd_stats[osd]) + zkhandler.writedata(zk_conn, { + '/ceph/osds/{}/stats'.format(osd): str(stats) + }) + except KeyError as e: + # One or more of the status commands timed out, just continue + logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') + osds_this_node += 1 + + queue.put(ceph_health_colour) + queue.put(ceph_health) + queue.put(osds_this_node) + # Keepalive update function def node_keepalive(): # Set the upstream IP in Zookeeper for clients to read @@ -1039,231 +1271,12 @@ def node_keepalive(): if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) + # Run Ceph status collection in separate thread for parallelization if enable_storage: - # Get Ceph cluster health (for local printing) - if debug: - print("Get Ceph cluster health (for local printing)") - retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1) - ceph_health = stdout.rstrip() - if 'HEALTH_OK' in ceph_health: - ceph_health_colour = fmt_green - elif 'HEALTH_WARN' in ceph_health: - ceph_health_colour = fmt_yellow - else: - ceph_health_colour = fmt_red - - # Set ceph health information in zookeeper (primary only) - if this_node.router_state == 'primary': - if debug: - print("Set ceph health information in zookeeper (primary only)") - # Get status info - retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1) - ceph_status = stdout - try: - zkhandler.writedata(zk_conn, { - '/ceph': str(ceph_status) - }) - except: - logger.out('Failed to set Ceph status data', state='e') - return - - # Set ceph rados df information in zookeeper (primary only) - if this_node.router_state == 'primary': - if debug: - print("Set ceph rados df information in zookeeper (primary only)") - # Get rados df info - retcode, stdout, stderr = common.run_os_command('rados df', timeout=1) - rados_df = stdout - try: - zkhandler.writedata(zk_conn, { - '/ceph/radosdf': str(rados_df) - }) - except: - logger.out('Failed to set Rados space data', state='e') - return - - # Set pool information in zookeeper (primary only) - if this_node.router_state == 'primary': - if debug: - print("Set pool information in zookeeper (primary only)") - - # Get pool info - retcode, stdout, stderr = common.run_os_command('ceph df --format json', timeout=1) - try: - ceph_pool_df_raw = json.loads(stdout)['pools'] - except json.decoder.JSONDecodeError: - logger.out('Failed to obtain Pool data (ceph df)', state='w') - ceph_pool_df_raw = [] - - retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1) - try: - rados_pool_df_raw = json.loads(stdout)['pools'] - except json.decoder.JSONDecodeError: - logger.out('Failed to obtain Pool data (rados df)', state='w') - rados_pool_df_raw = [] - - pool_count = len(ceph_pool_df_raw) - if debug: - print("Getting info for {} pools".format(pool_count)) - for pool_idx in range(0, pool_count): - try: - # Combine all the data for this pool - ceph_pool_df = ceph_pool_df_raw[pool_idx] - rados_pool_df = rados_pool_df_raw[pool_idx] - pool = ceph_pool_df - pool.update(rados_pool_df) - - # Ignore any pools that aren't in our pool list - if pool['name'] not in pool_list: - if debug: - print("Pool {} not in pool list {}".format(pool['name'], pool_list)) - continue - else: - if debug: - print("Parsing data for pool {}".format(pool['name'])) - - # Assemble a useful data structure - pool_df = { - 'id': pool['id'], - 'free_bytes': pool['stats']['max_avail'], - 'used_bytes': pool['stats']['bytes_used'], - 'used_percent': pool['stats']['percent_used'], - 'num_objects': pool['stats']['objects'], - 'num_object_clones': pool['num_object_clones'], - 'num_object_copies': pool['num_object_copies'], - 'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'], - 'num_objects_unfound': pool['num_objects_unfound'], - 'num_objects_degraded': pool['num_objects_degraded'], - 'read_ops': pool['read_ops'], - 'read_bytes': pool['read_bytes'], - 'write_ops': pool['write_ops'], - 'write_bytes': pool['write_bytes'] - } - - # Write the pool data to Zookeeper - zkhandler.writedata(zk_conn, { - '/ceph/pools/{}/stats'.format(pool['name']): str(json.dumps(pool_df)) - }) - except Exception as e: - # One or more of the status commands timed out, just continue - logger.out('Failed to format and send pool data', state='w') - pass - - # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs) - osds_this_node = 0 - if len(osd_list) > 0: - # Get data from Ceph OSDs - if debug: - print("Get data from Ceph OSDs") - # Parse the dump data - osd_dump = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1) - try: - osd_dump_raw = json.loads(stdout)['osds'] - except json.decoder.JSONDecodeError: - logger.out('Failed to obtain OSD data', state='w') - osd_dump_raw = [] - - if debug: - print("Loop through OSD dump") - for osd in osd_dump_raw: - osd_dump.update({ - str(osd['osd']): { - 'uuid': osd['uuid'], - 'up': osd['up'], - 'in': osd['in'], - 'primary_affinity': osd['primary_affinity'] - } - }) - - # Parse the df data - if debug: - print("Parse the OSD df data") - osd_df = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1) - try: - osd_df_raw = json.loads(stdout)['nodes'] - except Exception as e: - logger.out('Failed to parse OSD list: {}'.format(e), state='w') - osd_df_raw = [] - - if debug: - print("Loop through OSD df") - for osd in osd_df_raw: - osd_df.update({ - str(osd['id']): { - 'utilization': osd['utilization'], - 'var': osd['var'], - 'pgs': osd['pgs'], - 'kb': osd['kb'], - 'weight': osd['crush_weight'], - 'reweight': osd['reweight'], - } - }) - # Parse the status data - if debug: - print("Parse the OSD status data") - osd_status = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1) - if debug: - print("Loop through OSD status data") - for line in stdout.split('\n'): - # Strip off colour - line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) - # Split it for parsing - line = line.split() - if len(line) > 1 and line[1].isdigit(): - # This is an OSD line so parse it - osd_id = line[1] - node = line[3].split('.')[0] - used = line[5] - avail = line[7] - wr_ops = line[9] - wr_data = line[11] - rd_ops = line[13] - rd_data = line[15] - state = line[17] - osd_status.update({ - str(osd_id): { - 'node': node, - 'used': used, - 'avail': avail, - 'wr_ops': wr_ops, - 'wr_data': wr_data, - 'rd_ops': rd_ops, - 'rd_data': rd_data, - 'state': state - } - }) - # Merge them together into a single meaningful dict - if debug: - print("Merge OSD data together") - osd_stats = dict() - for osd in osd_list: - try: - this_dump = osd_dump[osd] - this_dump.update(osd_df[osd]) - this_dump.update(osd_status[osd]) - osd_stats[osd] = this_dump - except KeyError as e: - # One or more of the status commands timed out, just continue - logger.out('Failed to parse OSD stats into dictionary: {}'.format(e), state='w') - - # Trigger updates for each OSD on this node - if debug: - print("Trigger updates for each OSD on this node") - for osd in osd_list: - if d_osd[osd].node == myhostname: - try: - stats = json.dumps(osd_stats[osd]) - zkhandler.writedata(zk_conn, { - '/ceph/osds/{}/stats'.format(osd): str(stats) - }) - except KeyError as e: - # One or more of the status commands timed out, just continue - logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') - osds_this_node += 1 - + ceph_thread_queue = Queue() + ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={}) + ceph_stats_thread.start() + # Normalize running VM status memalloc = 0 vcpualloc = 0 @@ -1318,6 +1331,14 @@ def node_keepalive(): else: this_node.domains_count = 0 + # Wait for Ceph thread completion + if enable_storage: + ceph_stats_thread.join() + + ceph_health_colour = ceph_thread_queue.get() + ceph_health = ceph_thread_queue.get() + osds_this_node = ceph_thread_queue.get() + # Set our information in zookeeper keepalive_time = int(time.time()) if debug: