Compare commits

..

10 Commits

9 changed files with 128 additions and 56 deletions

View File

@ -1 +1 @@
0.9.28 0.9.30

View File

@ -42,6 +42,14 @@ To get started with PVC, please see the [About](https://parallelvirtualcluster.r
## Changelog ## Changelog
#### v0.9.30
* [Node Daemon] Fixes bug with schema validation
#### v0.9.29
* [Node Daemon] Corrects numerous bugs with node logging framework
#### v0.9.28 #### v0.9.28
* [CLI Client] Revamp confirmation options for "vm modify" command * [CLI Client] Revamp confirmation options for "vm modify" command

View File

@ -25,7 +25,7 @@ import yaml
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
# Daemon version # Daemon version
version = '0.9.28' version = '0.9.30'
# API version # API version
API_VERSION = 1.0 API_VERSION = 1.0

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name='pvc', name='pvc',
version='0.9.28', version='0.9.30',
packages=['pvc', 'pvc.cli_lib'], packages=['pvc', 'pvc.cli_lib'],
install_requires=[ install_requires=[
'Click', 'Click',

View File

@ -23,6 +23,7 @@ from collections import deque
from threading import Thread from threading import Thread
from queue import Queue from queue import Queue
from datetime import datetime from datetime import datetime
from time import sleep
from daemon_lib.zkhandler import ZKHandler from daemon_lib.zkhandler import ZKHandler
@ -83,7 +84,8 @@ class Logger(object):
self.last_prompt = '' self.last_prompt = ''
if self.config['zookeeper_logging']: if self.config['zookeeper_logging']:
self.zookeeper_logger = ZookeeperLogger(config) self.zookeeper_queue = Queue()
self.zookeeper_logger = ZookeeperLogger(self.config, self.zookeeper_queue)
self.zookeeper_logger.start() self.zookeeper_logger.start()
# Provide a hup function to close and reopen the writer # Provide a hup function to close and reopen the writer
@ -96,9 +98,15 @@ class Logger(object):
if self.config['file_logging']: if self.config['file_logging']:
self.writer.close() self.writer.close()
if self.config['zookeeper_logging']: if self.config['zookeeper_logging']:
self.out("Waiting for Zookeeper message queue to drain", state='s') self.out("Waiting 15s for Zookeeper message queue to drain", state='s')
while not self.zookeeper_logger.queue.empty():
pass tick_count = 0
while not self.zookeeper_queue.empty():
sleep(0.5)
tick_count += 1
if tick_count > 30:
break
self.zookeeper_logger.stop() self.zookeeper_logger.stop()
self.zookeeper_logger.join() self.zookeeper_logger.join()
@ -145,7 +153,7 @@ class Logger(object):
# Log to Zookeeper # Log to Zookeeper
if self.config['zookeeper_logging']: if self.config['zookeeper_logging']:
self.zookeeper_logger.queue.put(message) self.zookeeper_queue.put(message)
# Set last message variables # Set last message variables
self.last_colour = colour self.last_colour = colour
@ -157,19 +165,14 @@ class ZookeeperLogger(Thread):
Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other
daemon events while the records are written. They will be eventually-consistent daemon events while the records are written. They will be eventually-consistent
""" """
def __init__(self, config): def __init__(self, config, zookeeper_queue):
self.config = config self.config = config
self.node = self.config['node'] self.node = self.config['node']
self.max_lines = self.config['node_log_lines'] self.max_lines = self.config['node_log_lines']
self.queue = Queue() self.zookeeper_queue = zookeeper_queue
self.zkhandler = None self.connected = False
self.start_zkhandler()
# Ensure the root keys for this are instantiated
self.zkhandler.write([
('base.logs', ''),
(('logs', self.node), '')
])
self.running = False self.running = False
self.zkhandler = None
Thread.__init__(self, args=(), kwargs=None) Thread.__init__(self, args=(), kwargs=None)
def start_zkhandler(self): def start_zkhandler(self):
@ -179,10 +182,29 @@ class ZookeeperLogger(Thread):
self.zkhandler.disconnect() self.zkhandler.disconnect()
except Exception: except Exception:
pass pass
while True:
try:
self.zkhandler = ZKHandler(self.config, logger=None) self.zkhandler = ZKHandler(self.config, logger=None)
self.zkhandler.connect(persistent=True) self.zkhandler.connect(persistent=True)
break
except Exception:
sleep(0.5)
continue
self.connected = True
# Ensure the root keys for this are instantiated
self.zkhandler.write([
('base.logs', ''),
(('logs', self.node), '')
])
def run(self): def run(self):
while not self.connected:
self.start_zkhandler()
sleep(1)
self.running = True self.running = True
# Get the logs that are currently in Zookeeper and populate our deque # Get the logs that are currently in Zookeeper and populate our deque
raw_logs = self.zkhandler.read(('logs.messages', self.node)) raw_logs = self.zkhandler.read(('logs.messages', self.node))
@ -192,7 +214,7 @@ class ZookeeperLogger(Thread):
while self.running: while self.running:
# Get a new message # Get a new message
try: try:
message = self.queue.get(timeout=1) message = self.zookeeper_queue.get(timeout=1)
if not message: if not message:
continue continue
except Exception: except Exception:
@ -205,8 +227,21 @@ class ZookeeperLogger(Thread):
date = '' date = ''
# Add the message to the deque # Add the message to the deque
logs.append(f'{date}{message}') logs.append(f'{date}{message}')
tick_count = 0
while True:
try:
# Write the updated messages into Zookeeper # Write the updated messages into Zookeeper
self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))]) self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))])
break
except Exception:
# The write failed (connection loss, etc.) so retry for 15 seconds
sleep(0.5)
tick_count += 1
if tick_count > 30:
break
else:
continue
return return
def stop(self): def stop(self):

View File

@ -777,7 +777,7 @@ class ZKSchema(object):
logger.out(f'Key not found: {self.path(kpath)}', state='w') logger.out(f'Key not found: {self.path(kpath)}', state='w')
result = False result = False
for elem in ['logs', 'node', 'domain', 'network', 'osd', 'pool']: for elem in ['node', 'domain', 'network', 'osd', 'pool']:
# First read all the subelements of the key class # First read all the subelements of the key class
for child in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')): for child in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
# For each key in the schema for that particular elem # For each key in the schema for that particular elem
@ -856,7 +856,7 @@ class ZKSchema(object):
data = '' data = ''
zkhandler.zk_conn.create(self.path(kpath), data.encode(zkhandler.encoding)) zkhandler.zk_conn.create(self.path(kpath), data.encode(zkhandler.encoding))
for elem in ['logs', 'node', 'domain', 'network', 'osd', 'pool']: for elem in ['node', 'domain', 'network', 'osd', 'pool']:
# First read all the subelements of the key class # First read all the subelements of the key class
for child in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')): for child in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
# For each key in the schema for that particular elem # For each key in the schema for that particular elem

12
debian/changelog vendored
View File

@ -1,3 +1,15 @@
pvc (0.9.30-0) unstable; urgency=high
* [Node Daemon] Fixes bug with schema validation
-- Joshua M. Boniface <joshua@boniface.me> Tue, 20 Jul 2021 00:01:45 -0400
pvc (0.9.29-0) unstable; urgency=high
* [Node Daemon] Corrects numerous bugs with node logging framework
-- Joshua M. Boniface <joshua@boniface.me> Mon, 19 Jul 2021 16:55:41 -0400
pvc (0.9.28-0) unstable; urgency=high pvc (0.9.28-0) unstable; urgency=high
* [CLI Client] Revamp confirmation options for "vm modify" command * [CLI Client] Revamp confirmation options for "vm modify" command

View File

@ -42,6 +42,14 @@ To get started with PVC, please see the [About](https://parallelvirtualcluster.r
## Changelog ## Changelog
#### v0.9.30
* [Node Daemon] Fixes bug with schema validation
#### v0.9.29
* [Node Daemon] Corrects numerous bugs with node logging framework
#### v0.9.28 #### v0.9.28
* [CLI Client] Revamp confirmation options for "vm modify" command * [CLI Client] Revamp confirmation options for "vm modify" command

View File

@ -56,7 +56,7 @@ import pvcnoded.CephInstance as CephInstance
import pvcnoded.MetadataAPIInstance as MetadataAPIInstance import pvcnoded.MetadataAPIInstance as MetadataAPIInstance
# Version string for startup output # Version string for startup output
version = '0.9.28' version = '0.9.30'
############################################################################### ###############################################################################
# PVCD - node daemon startup program # PVCD - node daemon startup program
@ -76,8 +76,11 @@ version = '0.9.28'
# Daemon functions # Daemon functions
############################################################################### ###############################################################################
# Ensure the update_timer is None until it's set for real # Ensure update_timer, this_node, and d_domain are None until they're set for real
# Ensures cleanup() doesn't fail due to these items not being created yet
update_timer = None update_timer = None
this_node = None
d_domain = None
# Create timer to update this node in Zookeeper # Create timer to update this node in Zookeeper
@ -110,7 +113,7 @@ try:
pvcnoded_config_file = os.environ['PVCD_CONFIG_FILE'] pvcnoded_config_file = os.environ['PVCD_CONFIG_FILE']
except Exception: except Exception:
print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcnoded.') print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set before starting pvcnoded.')
exit(1) os._exit(1)
# Set local hostname and domain variables # Set local hostname and domain variables
myfqdn = gethostname() myfqdn = gethostname()
@ -142,7 +145,7 @@ def readConfig(pvcnoded_config_file, myhostname):
o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader) o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader)
except Exception as e: except Exception as e:
print('ERROR: Failed to parse configuration file: {}'.format(e)) print('ERROR: Failed to parse configuration file: {}'.format(e))
exit(1) os._exit(1)
# Handle the basic config (hypervisor-only) # Handle the basic config (hypervisor-only)
try: try:
@ -179,7 +182,7 @@ def readConfig(pvcnoded_config_file, myhostname):
} }
except Exception as e: except Exception as e:
print('ERROR: Failed to load configuration: {}'.format(e)) print('ERROR: Failed to load configuration: {}'.format(e))
exit(1) cleanup(failure=True)
config = config_general config = config_general
# Handle debugging config # Handle debugging config
@ -236,7 +239,7 @@ def readConfig(pvcnoded_config_file, myhostname):
except Exception as e: except Exception as e:
print('ERROR: Failed to load configuration: {}'.format(e)) print('ERROR: Failed to load configuration: {}'.format(e))
exit(1) cleanup(failure=True)
config = {**config, **config_networking} config = {**config, **config_networking}
# Create the by-id address entries # Create the by-id address entries
@ -250,7 +253,7 @@ def readConfig(pvcnoded_config_file, myhostname):
network = ip_network(config[network_key]) network = ip_network(config[network_key])
except Exception: except Exception:
print('ERROR: Network address {} for {} is not valid!'.format(config[network_key], network_key)) print('ERROR: Network address {} for {} is not valid!'.format(config[network_key], network_key))
exit(1) cleanup(failure=True)
# If we should be autoselected # If we should be autoselected
if config[address_key] == 'by-id': if config[address_key] == 'by-id':
@ -270,7 +273,7 @@ def readConfig(pvcnoded_config_file, myhostname):
raise raise
except Exception: except Exception:
print('ERROR: Floating address {} for {} is not valid!'.format(config[floating_key], floating_key)) print('ERROR: Floating address {} for {} is not valid!'.format(config[floating_key], floating_key))
exit(1) cleanup(failure=True)
# Handle the storage config # Handle the storage config
if config['enable_storage']: if config['enable_storage']:
@ -281,7 +284,7 @@ def readConfig(pvcnoded_config_file, myhostname):
} }
except Exception as e: except Exception as e:
print('ERROR: Failed to load configuration: {}'.format(e)) print('ERROR: Failed to load configuration: {}'.format(e))
exit(1) cleanup(failure=True)
config = {**config, **config_storage} config = {**config, **config_storage}
# Handle an empty ipmi_hostname # Handle an empty ipmi_hostname
@ -488,6 +491,9 @@ if enable_networking:
else: else:
common.run_os_command('ip route add default via {} dev {}'.format(upstream_gateway, 'brupstream')) common.run_os_command('ip route add default via {} dev {}'.format(upstream_gateway, 'brupstream'))
logger.out('Waiting 3s for networking to come up', state='s')
time.sleep(3)
############################################################################### ###############################################################################
# PHASE 2c - Prepare sysctl for pvcnoded # PHASE 2c - Prepare sysctl for pvcnoded
############################################################################### ###############################################################################
@ -559,8 +565,8 @@ if enable_storage:
logger.out('Starting Ceph manager daemon', state='i') logger.out('Starting Ceph manager daemon', state='i')
common.run_os_command('systemctl start ceph-mgr@{}'.format(myhostname)) common.run_os_command('systemctl start ceph-mgr@{}'.format(myhostname))
logger.out('Waiting 5s for daemons to start', state='s') logger.out('Waiting 3s for daemons to start', state='s')
time.sleep(5) time.sleep(3)
############################################################################### ###############################################################################
# PHASE 4 - Attempt to connect to the coordinators and start zookeeper client # PHASE 4 - Attempt to connect to the coordinators and start zookeeper client
@ -575,7 +581,7 @@ try:
zkhandler.connect(persistent=True) zkhandler.connect(persistent=True)
except Exception as e: except Exception as e:
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e') logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
exit(1) os._exit(1)
logger.out('Validating Zookeeper schema', state='i') logger.out('Validating Zookeeper schema', state='i')
@ -696,7 +702,7 @@ else:
# Cleanup function # Cleanup function
def cleanup(): def cleanup(failure=False):
global logger, zkhandler, update_timer, d_domain global logger, zkhandler, update_timer, d_domain
logger.out('Terminating pvcnoded and cleaning up', state='s') logger.out('Terminating pvcnoded and cleaning up', state='s')
@ -708,18 +714,18 @@ def cleanup():
# Waiting for any flushes to complete # Waiting for any flushes to complete
logger.out('Waiting for any active flushes', state='s') logger.out('Waiting for any active flushes', state='s')
if this_node is not None:
while this_node.flush_thread is not None: while this_node.flush_thread is not None:
time.sleep(0.5) time.sleep(0.5)
# Stop console logging on all VMs # Stop console logging on all VMs
logger.out('Stopping domain console watchers', state='s') logger.out('Stopping domain console watchers', state='s')
if d_domain is not None:
for domain in d_domain: for domain in d_domain:
if d_domain[domain].getnode() == myhostname: if d_domain[domain].getnode() == myhostname:
try: try:
d_domain[domain].console_log_instance.stop() d_domain[domain].console_log_instance.stop()
except NameError: except Exception:
pass
except AttributeError:
pass pass
# Force into secondary coordinator state if needed # Force into secondary coordinator state if needed
@ -737,13 +743,11 @@ def cleanup():
# Stop keepalive thread # Stop keepalive thread
try: try:
stopKeepaliveTimer() stopKeepaliveTimer()
except NameError:
pass
except AttributeError:
pass
logger.out('Performing final keepalive update', state='s') logger.out('Performing final keepalive update', state='s')
node_keepalive() node_keepalive()
except Exception:
pass
# Set stop state in Zookeeper # Set stop state in Zookeeper
zkhandler.write([ zkhandler.write([
@ -763,12 +767,17 @@ def cleanup():
logger.out('Terminated pvc daemon', state='s') logger.out('Terminated pvc daemon', state='s')
logger.terminate() logger.terminate()
os._exit(0) if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function # Termination function
def term(signum='', frame=''): def term(signum='', frame=''):
cleanup() cleanup(failure=False)
# Hangup (logrotate) function # Hangup (logrotate) function
@ -868,7 +877,7 @@ if enable_hypervisor:
lv_conn.close() lv_conn.close()
except Exception as e: except Exception as e:
logger.out('ERROR: Failed to connect to Libvirt daemon: {}'.format(e), state='e') logger.out('ERROR: Failed to connect to Libvirt daemon: {}'.format(e), state='e')
exit(1) cleanup(failure=True)
############################################################################### ###############################################################################
# PHASE 7c - Ensure NFT is running on the local host # PHASE 7c - Ensure NFT is running on the local host