Compare commits

..

8 Commits

8 changed files with 112 additions and 54 deletions

View File

@ -1 +1 @@
0.9.28
0.9.29

View File

@ -42,6 +42,10 @@ To get started with PVC, please see the [About](https://parallelvirtualcluster.r
## Changelog
#### v0.9.29
* [Node Daemon] Corrects numerous bugs with node logging framework
#### v0.9.28
* [CLI Client] Revamp confirmation options for "vm modify" command

View File

@ -25,7 +25,7 @@ import yaml
from distutils.util import strtobool as dustrtobool
# Daemon version
version = '0.9.28'
version = '0.9.29'
# API version
API_VERSION = 1.0

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='pvc',
version='0.9.28',
version='0.9.29',
packages=['pvc', 'pvc.cli_lib'],
install_requires=[
'Click',

View File

@ -23,6 +23,7 @@ from collections import deque
from threading import Thread
from queue import Queue
from datetime import datetime
from time import sleep
from daemon_lib.zkhandler import ZKHandler
@ -83,7 +84,8 @@ class Logger(object):
self.last_prompt = ''
if self.config['zookeeper_logging']:
self.zookeeper_logger = ZookeeperLogger(config)
self.zookeeper_queue = Queue()
self.zookeeper_logger = ZookeeperLogger(self.config, self.zookeeper_queue)
self.zookeeper_logger.start()
# Provide a hup function to close and reopen the writer
@ -96,9 +98,15 @@ class Logger(object):
if self.config['file_logging']:
self.writer.close()
if self.config['zookeeper_logging']:
self.out("Waiting for Zookeeper message queue to drain", state='s')
while not self.zookeeper_logger.queue.empty():
pass
self.out("Waiting 15s for Zookeeper message queue to drain", state='s')
tick_count = 0
while not self.zookeeper_queue.empty():
sleep(0.5)
tick_count += 1
if tick_count > 30:
break
self.zookeeper_logger.stop()
self.zookeeper_logger.join()
@ -145,7 +153,7 @@ class Logger(object):
# Log to Zookeeper
if self.config['zookeeper_logging']:
self.zookeeper_logger.queue.put(message)
self.zookeeper_queue.put(message)
# Set last message variables
self.last_colour = colour
@ -157,19 +165,14 @@ class ZookeeperLogger(Thread):
Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other
daemon events while the records are written. They will be eventually-consistent
"""
def __init__(self, config):
def __init__(self, config, zookeeper_queue):
self.config = config
self.node = self.config['node']
self.max_lines = self.config['node_log_lines']
self.queue = Queue()
self.zkhandler = None
self.start_zkhandler()
# Ensure the root keys for this are instantiated
self.zkhandler.write([
('base.logs', ''),
(('logs', self.node), '')
])
self.zookeeper_queue = zookeeper_queue
self.connected = False
self.running = False
self.zkhandler = None
Thread.__init__(self, args=(), kwargs=None)
def start_zkhandler(self):
@ -179,10 +182,29 @@ class ZookeeperLogger(Thread):
self.zkhandler.disconnect()
except Exception:
pass
self.zkhandler = ZKHandler(self.config, logger=None)
self.zkhandler.connect(persistent=True)
while True:
try:
self.zkhandler = ZKHandler(self.config, logger=None)
self.zkhandler.connect(persistent=True)
break
except Exception:
sleep(0.5)
continue
self.connected = True
# Ensure the root keys for this are instantiated
self.zkhandler.write([
('base.logs', ''),
(('logs', self.node), '')
])
def run(self):
while not self.connected:
self.start_zkhandler()
sleep(1)
self.running = True
# Get the logs that are currently in Zookeeper and populate our deque
raw_logs = self.zkhandler.read(('logs.messages', self.node))
@ -192,7 +214,7 @@ class ZookeeperLogger(Thread):
while self.running:
# Get a new message
try:
message = self.queue.get(timeout=1)
message = self.zookeeper_queue.get(timeout=1)
if not message:
continue
except Exception:
@ -205,8 +227,21 @@ class ZookeeperLogger(Thread):
date = ''
# Add the message to the deque
logs.append(f'{date}{message}')
# Write the updated messages into Zookeeper
self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))])
tick_count = 0
while True:
try:
# Write the updated messages into Zookeeper
self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))])
break
except Exception:
# The write failed (connection loss, etc.) so retry for 15 seconds
sleep(0.5)
tick_count += 1
if tick_count > 30:
break
else:
continue
return
def stop(self):

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
pvc (0.9.29-0) unstable; urgency=high
* [Node Daemon] Corrects numerous bugs with node logging framework
-- Joshua M. Boniface <joshua@boniface.me> Mon, 19 Jul 2021 16:55:41 -0400
pvc (0.9.28-0) unstable; urgency=high
* [CLI Client] Revamp confirmation options for "vm modify" command

View File

@ -42,6 +42,10 @@ To get started with PVC, please see the [About](https://parallelvirtualcluster.r
## Changelog
#### v0.9.29
* [Node Daemon] Corrects numerous bugs with node logging framework
#### v0.9.28
* [CLI Client] Revamp confirmation options for "vm modify" command

View File

@ -56,7 +56,7 @@ import pvcnoded.CephInstance as CephInstance
import pvcnoded.MetadataAPIInstance as MetadataAPIInstance
# Version string for startup output
version = '0.9.28'
version = '0.9.29'
###############################################################################
# PVCD - node daemon startup program
@ -76,8 +76,11 @@ version = '0.9.28'
# Daemon functions
###############################################################################
# Ensure the update_timer is None until it's set for real
# Ensure update_timer, this_node, and d_domain are None until they're set for real
# Ensures cleanup() doesn't fail due to these items not being created yet
update_timer = None
this_node = None
d_domain = None
# Create timer to update this node in Zookeeper
@ -110,7 +113,7 @@ try:
pvcnoded_config_file = os.environ['PVCD_CONFIG_FILE']
except Exception:
print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcnoded.')
exit(1)
os._exit(1)
# Set local hostname and domain variables
myfqdn = gethostname()
@ -142,7 +145,7 @@ def readConfig(pvcnoded_config_file, myhostname):
o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader)
except Exception as e:
print('ERROR: Failed to parse configuration file: {}'.format(e))
exit(1)
os._exit(1)
# Handle the basic config (hypervisor-only)
try:
@ -179,7 +182,7 @@ def readConfig(pvcnoded_config_file, myhostname):
}
except Exception as e:
print('ERROR: Failed to load configuration: {}'.format(e))
exit(1)
cleanup(failure=True)
config = config_general
# Handle debugging config
@ -236,7 +239,7 @@ def readConfig(pvcnoded_config_file, myhostname):
except Exception as e:
print('ERROR: Failed to load configuration: {}'.format(e))
exit(1)
cleanup(failure=True)
config = {**config, **config_networking}
# Create the by-id address entries
@ -250,7 +253,7 @@ def readConfig(pvcnoded_config_file, myhostname):
network = ip_network(config[network_key])
except Exception:
print('ERROR: Network address {} for {} is not valid!'.format(config[network_key], network_key))
exit(1)
cleanup(failure=True)
# If we should be autoselected
if config[address_key] == 'by-id':
@ -270,7 +273,7 @@ def readConfig(pvcnoded_config_file, myhostname):
raise
except Exception:
print('ERROR: Floating address {} for {} is not valid!'.format(config[floating_key], floating_key))
exit(1)
cleanup(failure=True)
# Handle the storage config
if config['enable_storage']:
@ -281,7 +284,7 @@ def readConfig(pvcnoded_config_file, myhostname):
}
except Exception as e:
print('ERROR: Failed to load configuration: {}'.format(e))
exit(1)
cleanup(failure=True)
config = {**config, **config_storage}
# Handle an empty ipmi_hostname
@ -488,6 +491,9 @@ if enable_networking:
else:
common.run_os_command('ip route add default via {} dev {}'.format(upstream_gateway, 'brupstream'))
logger.out('Waiting 3s for networking to come up', state='s')
time.sleep(3)
###############################################################################
# PHASE 2c - Prepare sysctl for pvcnoded
###############################################################################
@ -559,8 +565,8 @@ if enable_storage:
logger.out('Starting Ceph manager daemon', state='i')
common.run_os_command('systemctl start ceph-mgr@{}'.format(myhostname))
logger.out('Waiting 5s for daemons to start', state='s')
time.sleep(5)
logger.out('Waiting 3s for daemons to start', state='s')
time.sleep(3)
###############################################################################
# PHASE 4 - Attempt to connect to the coordinators and start zookeeper client
@ -575,7 +581,7 @@ try:
zkhandler.connect(persistent=True)
except Exception as e:
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
exit(1)
os._exit(1)
logger.out('Validating Zookeeper schema', state='i')
@ -696,7 +702,7 @@ else:
# Cleanup function
def cleanup():
def cleanup(failure=False):
global logger, zkhandler, update_timer, d_domain
logger.out('Terminating pvcnoded and cleaning up', state='s')
@ -708,19 +714,19 @@ def cleanup():
# Waiting for any flushes to complete
logger.out('Waiting for any active flushes', state='s')
while this_node.flush_thread is not None:
time.sleep(0.5)
if this_node is not None:
while this_node.flush_thread is not None:
time.sleep(0.5)
# Stop console logging on all VMs
logger.out('Stopping domain console watchers', state='s')
for domain in d_domain:
if d_domain[domain].getnode() == myhostname:
try:
d_domain[domain].console_log_instance.stop()
except NameError:
pass
except AttributeError:
pass
if d_domain is not None:
for domain in d_domain:
if d_domain[domain].getnode() == myhostname:
try:
d_domain[domain].console_log_instance.stop()
except Exception:
pass
# Force into secondary coordinator state if needed
try:
@ -737,13 +743,11 @@ def cleanup():
# Stop keepalive thread
try:
stopKeepaliveTimer()
except NameError:
pass
except AttributeError:
pass
logger.out('Performing final keepalive update', state='s')
node_keepalive()
logger.out('Performing final keepalive update', state='s')
node_keepalive()
except Exception:
pass
# Set stop state in Zookeeper
zkhandler.write([
@ -763,12 +767,17 @@ def cleanup():
logger.out('Terminated pvc daemon', state='s')
logger.terminate()
os._exit(0)
if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function
def term(signum='', frame=''):
cleanup()
cleanup(failure=False)
# Hangup (logrotate) function
@ -868,7 +877,7 @@ if enable_hypervisor:
lv_conn.close()
except Exception as e:
logger.out('ERROR: Failed to connect to Libvirt daemon: {}'.format(e), state='e')
exit(1)
cleanup(failure=True)
###############################################################################
# PHASE 7c - Ensure NFT is running on the local host