Implement node logging into Zookeeper
Adds the ability to send node daemon logs to Zookeeper to facilitate a command like "pvc node log", similar to "pvc vm log". Each node stores its logs in a separate tree under "/logs" which can then be combined or queried. By default, set by config, only 2000 lines are kept.
This commit is contained in:
@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# log.py - Output (stdout + logfile) functions
|
||||
# log.py - PVC daemon logger functions
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
|
||||
@ -19,7 +19,12 @@
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import datetime
|
||||
from collections import deque
|
||||
from threading import Thread
|
||||
from queue import Queue
|
||||
from datetime import datetime
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
|
||||
|
||||
class Logger(object):
|
||||
@ -77,6 +82,10 @@ class Logger(object):
|
||||
self.last_colour = ''
|
||||
self.last_prompt = ''
|
||||
|
||||
if self.config['zookeeper_logging']:
|
||||
self.zookeeper_logger = ZookeeperLogger(config)
|
||||
self.zookeeper_logger.start()
|
||||
|
||||
# Provide a hup function to close and reopen the writer
|
||||
def hup(self):
|
||||
self.writer.close()
|
||||
@ -87,7 +96,7 @@ class Logger(object):
|
||||
|
||||
# Get the date
|
||||
if self.config['log_dates']:
|
||||
date = '{} - '.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'))
|
||||
date = '{} - '.format(datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'))
|
||||
else:
|
||||
date = ''
|
||||
|
||||
@ -123,6 +132,53 @@ class Logger(object):
|
||||
if self.config['file_logging']:
|
||||
self.writer.write(message + '\n')
|
||||
|
||||
# Log to Zookeeper
|
||||
if self.config['zookeeper_logging']:
|
||||
self.zookeeper_logger.queue.put(message)
|
||||
|
||||
# Set last message variables
|
||||
self.last_colour = colour
|
||||
self.last_prompt = prompt
|
||||
|
||||
|
||||
class ZookeeperLogger(Thread):
|
||||
"""
|
||||
Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other
|
||||
daemon events while the records are written. They will be eventually-consistent
|
||||
"""
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.node = self.config['node']
|
||||
self.max_lines = self.config['node_log_lines']
|
||||
self.queue = Queue()
|
||||
self.zkhandler = None
|
||||
self.start_zkhandler()
|
||||
self.zkhandler.write([(('logs', self.node), '')])
|
||||
Thread.__init__(self, args=(), kwargs=None)
|
||||
|
||||
def start_zkhandler(self):
|
||||
# We must open our own dedicated Zookeeper instance because we can't guarantee one already exists when this starts
|
||||
if self.zkhandler is not None:
|
||||
try:
|
||||
self.zkhandler.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
self.zkhandler = ZKHandler(self.config, logger=None)
|
||||
self.zkhandler.connect(persistent=True)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
message = self.queue.get()
|
||||
self.write_log(message)
|
||||
|
||||
def write_log(self, message):
|
||||
# Log to Zookeeper
|
||||
with self.zkhandler.writelock(('logs.messages', self.node)):
|
||||
logs = deque(self.zkhandler.read(('logs.messages', self.node)).split('\n'), self.max_lines)
|
||||
if not self.config['log_dates']:
|
||||
# We want to log dates here, even if the log_dates config is not set
|
||||
date = '{} - '.format(datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'))
|
||||
else:
|
||||
date = ''
|
||||
logs.append(f'{date}{message}')
|
||||
self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))])
|
||||
|
Reference in New Issue
Block a user