Refactor pvcnoded to reduce Daemon.py size

This branch commit refactors the pvcnoded component to better adhere to
good programming practices. The previous Daemon.py was a massive file
which contained almost 2000 lines of direct, root-level code which was
directly imported. Not only was this poor practice, but this resulted
in a nigh-unmaintainable file which was hard even for me to understand.

This refactoring splits a large section of the code from Daemon.py into
separate small modules and functions in the `util/` directory. This will
hopefully make most of the functionality easy to find and modify without
having to dig through a single large file.

Further the existing subcomponents have been moved to the `objects/`
directory which clearly separates them.

Finally, the Daemon.py code has mostly been moved into a function,
`entrypoint()`, which is then called from the `pvcnoded.py` stub.

An additional item is that most format strings have been replaced by
f-strings to make use of the Python 3.6 features in Daemon.py and the
utility files.
This commit is contained in:
2021-08-21 02:46:11 -04:00
parent afdf254297
commit afb0359c20
24 changed files with 2667 additions and 2433 deletions

View File

@ -0,0 +1,428 @@
#!/usr/bin/env python3
# CephInstance.py - Class implementing a PVC node Ceph instance
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import json
import psutil
import daemon_lib.common as common
class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id):
self.zkhandler = zkhandler
self.this_node = this_node
self.osd_id = osd_id
self.node = None
self.size = None
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id))
def watch_osd_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.node:
self.node = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id))
def watch_osd_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
@staticmethod
def add_osd(zkhandler, logger, node, device, weight):
# We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
try:
# 1. Create an OSD; we do this so we know what ID will be gen'd
retcode, stdout, stderr = common.run_os_command('ceph osd create')
if retcode:
print('ceph osd create')
print(stdout)
print(stderr)
raise
osd_id = stdout.rstrip()
# 2. Remove that newly-created OSD
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
if retcode:
print('ceph osd rm')
print(stdout)
print(stderr)
raise
# 3a. Zap the disk to ensure it is ready to go
logger.out('Zapping disk {}'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(device))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 3b. Create the OSD for real
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm prepare --bluestore --data {device}'.format(
osdid=osd_id,
device=device
)
)
if retcode:
print('ceph-volume lvm prepare')
print(stdout)
print(stderr)
raise
# 4a. Get OSD FSID
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm list {device}'.format(
osdid=osd_id,
device=device
)
)
for line in stdout.split('\n'):
if 'osd fsid' in line:
osd_fsid = line.split()[-1]
if not osd_fsid:
print('ceph-volume lvm list')
print('Could not find OSD fsid in data:')
print(stdout)
print(stderr)
raise
# 4b. Activate the OSD
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm activate --bluestore {osdid} {osdfsid}'.format(
osdid=osd_id,
osdfsid=osd_fsid
)
)
if retcode:
print('ceph-volume lvm activate')
print(stdout)
print(stderr)
raise
# 5. Add it to the crush map
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph osd crush add osd.{osdid} {weight} root=default host={node}'.format(
osdid=osd_id,
weight=weight,
node=node
)
)
if retcode:
print('ceph osd crush add')
print(stdout)
print(stderr)
raise
time.sleep(0.5)
# 6. Verify it started
retcode, stdout, stderr = common.run_os_command(
'systemctl status ceph-osd@{osdid}'.format(
osdid=osd_id
)
)
if retcode:
print('systemctl status')
print(stdout)
print(stderr)
raise
# 7. Add the new OSD to the list
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
zkhandler.write([
(('osd', osd_id), ''),
(('osd.node', osd_id), node),
(('osd.device', osd_id), device),
(('osd.stats', osd_id), '{}'),
])
# Log it
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
return False
@staticmethod
def remove_osd(zkhandler, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try:
# 1. Verify the OSD is present
retcode, stdout, stderr = common.run_os_command('ceph osd ls')
osd_list = stdout.split('\n')
if osd_id not in osd_list:
logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e')
return True
# 1. Set the OSD out so it will flush
logger.out('Setting out OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
if retcode:
print('ceph osd out')
print(stdout)
print(stderr)
raise
# 2. Wait for the OSD to flush
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
osd_string = str()
while True:
try:
retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json')
dump_string = json.loads(stdout)
for osd in dump_string:
if str(osd['osd']) == osd_id:
osd_string = osd
num_pgs = osd_string['num_pgs']
if num_pgs > 0:
time.sleep(5)
else:
raise
except Exception:
break
# 3. Stop the OSD process and wait for it to be terminated
logger.out('Stopping OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id))
if retcode:
print('systemctl stop')
print(stdout)
print(stderr)
raise
# FIXME: There has to be a better way to do this /shrug
while True:
is_osd_up = False
# Find if there is a process named ceph-osd with arg '--id {id}'
for p in psutil.process_iter(attrs=['name', 'cmdline']):
if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']):
is_osd_up = True
# If there isn't, continue
if not is_osd_up:
break
# 4. Determine the block devices
retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id))
vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name))
pv_block = stdout.strip()
# 5. Zap the volumes
logger.out('Zapping OSD disk with ID {} on {}'.format(osd_id, pv_block), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 6. Purge the OSD from Ceph
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id))
if retcode:
print('ceph osd purge')
print(stdout)
print(stderr)
raise
# 7. Delete OSD from ZK
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
zkhandler.delete(('osd', osd_id), recursive=True)
# Log it
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.name = name
self.pgs = ''
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name))
def watch_pool_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.pgs:
self.pgs = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name))
def watch_pool_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephVolumeInstance(object):
def __init__(self, zkhandler, this_node, pool, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}'))
def watch_volume_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephSnapshotInstance(object):
def __init__(self, zkhandler, this_node, pool, volume, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.volume = volume
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapshot.stats', f'{self.pool}/{self.volume}/{self.name}'))
def watch_snapshot_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
# Primary command function
# This command pipe is only used for OSD adds and removes
def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'osd_add':
node, device, weight = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the OSD
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing an OSD
elif command == 'osd_remove':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Remove the OSD
result = CephOSDInstance.remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

View File

@ -0,0 +1,522 @@
#!/usr/bin/env python3
# DNSAggregatorInstance.py - Class implementing a DNS aggregator and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import dns.zone
import dns.query
import psycopg2
from threading import Thread, Event
import daemon_lib.common as common
class DNSAggregatorInstance(object):
# Initialization function
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.dns_networks = dict()
self.is_active = False
self.dns_server_daemon = PowerDNSInstance(self)
self.dns_axfr_daemon = AXFRDaemonInstance(self)
# Start up the PowerDNS instance
def start_aggregator(self):
# Restart the SQL connection
self.dns_server_daemon.start()
self.dns_axfr_daemon.start()
self.is_active = True
# Stop the PowerDNS instance
def stop_aggregator(self):
self.is_active = False
self.dns_axfr_daemon.stop()
self.dns_server_daemon.stop()
def add_network(self, network):
self.dns_networks[network] = DNSNetworkInstance(self, network)
self.dns_networks[network].add_network()
self.dns_axfr_daemon.update_networks(self.dns_networks)
def remove_network(self, network):
if self.dns_networks[network]:
self.dns_networks[network].remove_network()
del self.dns_networks[network]
self.dns_axfr_daemon.update_networks(self.dns_networks)
class PowerDNSInstance(object):
# Initialization function
def __init__(self, aggregator):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.dns_server_daemon = None
# Floating upstreams
self.cluster_floatingipaddr, self.cluster_cidrnetmask = self.config['cluster_floating_ip'].split('/')
self.upstream_floatingipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
def start(self):
self.logger.out(
'Starting PowerDNS zone aggregator',
state='i'
)
# Define the PowerDNS config
dns_configuration = [
# Option # Explanation
'--no-config',
'--daemon=no', # Start directly
'--guardian=yes', # Use a guardian
'--disable-syslog=yes', # Log only to stdout (which is then captured)
'--disable-axfr=no', # Allow AXFRs
'--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere
'--local-address={},{}'.format(self.cluster_floatingipaddr, self.upstream_floatingipaddr), # Listen on floating IPs
'--local-port=53', # On port 53
'--log-dns-details=on', # Log details
'--loglevel=3', # Log info
'--master=yes', # Enable master mode
'--slave=yes', # Enable slave mode
'--slave-renotify=yes', # Renotify out for our slaved zones
'--version-string=powerdns', # Set the version string
'--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name
'--socket-dir={}'.format(self.config['pdns_dynamic_directory']), # Standard socket directory
'--launch=gpgsql', # Use the PostgreSQL backend
'--gpgsql-host={}'.format(self.config['pdns_postgresql_host']), # PostgreSQL instance
'--gpgsql-port={}'.format(self.config['pdns_postgresql_port']), # Default port
'--gpgsql-dbname={}'.format(self.config['pdns_postgresql_dbname']), # Database name
'--gpgsql-user={}'.format(self.config['pdns_postgresql_user']), # User name
'--gpgsql-password={}'.format(self.config['pdns_postgresql_password']), # User password
'--gpgsql-dnssec=no', # Do DNSSEC elsewhere
]
# Start the pdns process in a thread
self.dns_server_daemon = common.run_os_daemon(
'/usr/sbin/pdns_server {}'.format(
' '.join(dns_configuration)
),
environment=None,
logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory'])
)
if self.dns_server_daemon:
self.logger.out(
'Successfully started PowerDNS zone aggregator',
state='o'
)
def stop(self):
if self.dns_server_daemon:
self.logger.out(
'Stopping PowerDNS zone aggregator',
state='i'
)
# Terminate, then kill
self.dns_server_daemon.signal('term')
time.sleep(0.2)
self.dns_server_daemon.signal('kill')
self.logger.out(
'Successfully stopped PowerDNS zone aggregator',
state='o'
)
class DNSNetworkInstance(object):
# Initialization function
def __init__(self, aggregator, network):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.sql_conn = None
self.network = network
# Add a new network to the aggregator database
def add_network(self):
network_domain = self.network.domain
self.logger.out(
'Adding entry for client domain {}'.format(
network_domain
),
prefix='DNS aggregator',
state='o'
)
# Connect to the database
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
sql_curs = self.sql_conn.cursor()
# Try to access the domains entry
sql_curs.execute(
"SELECT * FROM domains WHERE name=%s",
(network_domain,)
)
results = sql_curs.fetchone()
# If we got back a result, don't try to add the domain to the DB
if results:
write_domain = False
else:
write_domain = True
# Write the domain to the database if we're active
if self.aggregator.is_active and write_domain:
sql_curs.execute(
"INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, 'MASTER', 'internal', 0)",
(network_domain,)
)
self.sql_conn.commit()
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(network_domain,)
)
domain_id = sql_curs.fetchone()
sql_curs.execute(
"""
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
(%s, %s, %s, %s, %s, %s)
""",
(domain_id, network_domain, 'nsX.{d} root.{d} 1 10800 1800 86400 86400'.format(d=self.config['upstream_domain']), 'SOA', 86400, 0)
)
if self.network.name_servers:
ns_servers = self.network.name_servers
else:
ns_servers = ['pvc-dns.{}'.format(self.config['upstream_domain'])]
for ns_server in ns_servers:
sql_curs.execute(
"""
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
(%s, %s, %s, %s, %s, %s)
""",
(domain_id, network_domain, ns_server, 'NS', 86400, 0)
)
self.sql_conn.commit()
self.sql_conn.close()
self.sql_conn = None
# Remove a deleted network from the aggregator database
def remove_network(self):
network_domain = self.network.domain
self.logger.out(
'Removing entry for client domain {}'.format(
network_domain
),
prefix='DNS aggregator',
state='o'
)
# Connect to the database
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
sql_curs = self.sql_conn.cursor()
# Get the domain ID
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(network_domain,)
)
domain_id = sql_curs.fetchone()
# Delete the domain from the database if we're active
if self.aggregator.is_active and domain_id:
sql_curs.execute(
"DELETE FROM domains WHERE id=%s",
(domain_id,)
)
sql_curs.execute(
"DELETE FROM records WHERE domain_id=%s",
(domain_id,)
)
self.sql_conn.commit()
self.sql_conn.close()
self.sql_conn = None
class AXFRDaemonInstance(object):
# Initialization function
def __init__(self, aggregator):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.dns_networks = self.aggregator.dns_networks
self.thread_stopper = Event()
self.thread = None
self.sql_conn = None
def update_networks(self, dns_networks):
self.dns_networks = dns_networks
def start(self):
# Create the thread
self.thread_stopper.clear()
self.thread = Thread(target=self.run, args=(), kwargs={})
# Start a local instance of the SQL connection
# Trying to use the instance from the main DNS Aggregator can result in connection failures
# after the leader transitions
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
# Start the thread
self.thread.start()
def stop(self):
self.thread_stopper.set()
if self.sql_conn:
self.sql_conn.close()
self.sql_conn = None
def run(self):
# Wait for all the DNSMASQ instances to actually start
time.sleep(5)
while not self.thread_stopper.is_set():
# We do this for each network
for network, instance in self.dns_networks.items():
# Set up our SQL cursor
try:
sql_curs = self.sql_conn.cursor()
except Exception:
time.sleep(0.5)
continue
# Set up our basic variables
domain = network.domain
if network.ip4_gateway != 'None':
dnsmasq_ip = network.ip4_gateway
else:
dnsmasq_ip = network.ip6_gateway
#
# Get an AXFR from the dnsmasq instance and list of records
#
try:
axfr = dns.query.xfr(dnsmasq_ip, domain, lifetime=5.0)
z = dns.zone.from_xfr(axfr)
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
except Exception as e:
if self.config['debug']:
self.logger.out('{} {} ({})'.format(e, dnsmasq_ip, domain), state='d', prefix='dns-aggregator')
continue
# Fix the formatting because it's useless
# reference: ['@ 600 IN SOA . . 4 1200 180 1209600 600\n@ 600 IN NS .', 'test3 600 IN A 10.1.1.203\ntest3 600 IN AAAA 2001:b23e:1113:0:5054:ff:fe5c:f131', etc.]
# We don't really care about dnsmasq's terrible SOA or NS records which are in [0]
string_records = '\n'.join(records_raw[1:])
# Split into individual records
records_new = list()
for element in string_records.split('\n'):
if element:
record = element.split()
# Handle space-containing data elements
if domain not in record[0]:
name = '{}.{}'.format(record[0], domain)
else:
name = record[0]
entry = '{} {} IN {} {}'.format(name, record[1], record[3], ' '.join(record[4:]))
records_new.append(entry)
#
# Get the current zone from the database
#
try:
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(domain,)
)
domain_id = sql_curs.fetchone()
sql_curs.execute(
"SELECT * FROM records WHERE domain_id=%s",
(domain_id,)
)
results = list(sql_curs.fetchall())
if self.config['debug']:
self.logger.out('SQL query results: {}'.format(results), state='d', prefix='dns-aggregator')
except Exception as e:
self.logger.out('ERROR: Failed to obtain DNS records from database: {}'.format(e))
# Fix the formatting because it's useless for comparison
# reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.)
records_old = list()
records_old_ids = list()
if not results:
if self.config['debug']:
self.logger.out('No results found, skipping.', state='d', prefix='dns-aggregator')
continue
for record in results:
# Skip the non-A
r_id = record[0]
r_name = record[2]
r_ttl = record[5]
r_type = record[3]
r_data = record[4]
# Assemble a list element in the same format as the AXFR data
entry = '{} {} IN {} {}'.format(r_name, r_ttl, r_type, r_data)
if self.config['debug']:
self.logger.out('Found record: {}'.format(entry), state='d', prefix='dns-aggregator')
# Skip non-A or AAAA records
if r_type != 'A' and r_type != 'AAAA':
if self.config['debug']:
self.logger.out('Skipping record {}, not A or AAAA: "{}"'.format(entry, r_type), state='d', prefix='dns-aggregator')
continue
records_old.append(entry)
records_old_ids.append(r_id)
records_new.sort()
records_old.sort()
if self.config['debug']:
self.logger.out('New: {}'.format(records_new), state='d', prefix='dns-aggregator')
self.logger.out('Old: {}'.format(records_old), state='d', prefix='dns-aggregator')
# Find the differences between the lists
# Basic check one: are they completely equal
if records_new != records_old:
# Get set elements
in_new = set(records_new)
in_old = set(records_old)
in_new_not_in_old = in_new - in_old
in_old_not_in_new = in_old - in_new
if self.config['debug']:
self.logger.out('New but not old: {}'.format(in_new_not_in_old), state='d', prefix='dns-aggregator')
self.logger.out('Old but not new: {}'.format(in_old_not_in_new), state='d', prefix='dns-aggregator')
# Go through the old list
remove_records = list() # list of database IDs
for i in range(len(records_old)):
record_id = records_old_ids[i]
record = records_old[i]
splitrecord = records_old[i].split()
# If the record is not in the new list, remove it
if record in in_old_not_in_new:
remove_records.append(record_id)
# Go through the new elements
for newrecord in in_new_not_in_old:
splitnewrecord = newrecord.split()
# If there's a name and type match with different content, remove the old one
if splitrecord[0] == splitnewrecord[0] and splitrecord[3] == splitnewrecord[3]:
remove_records.append(record_id)
changed = False
if len(remove_records) > 0:
# Remove the invalid old records
for record_id in remove_records:
if self.config['debug']:
self.logger.out('Removing record: {}'.format(record_id), state='d', prefix='dns-aggregator')
sql_curs.execute(
"DELETE FROM records WHERE id=%s",
(record_id,)
)
changed = True
if len(in_new_not_in_old) > 0:
# Add the new records
for record in in_new_not_in_old:
# [NAME, TTL, 'IN', TYPE, DATA]
record = record.split()
r_name = record[0]
r_ttl = record[1]
r_type = record[3]
r_data = record[4]
if self.config['debug']:
self.logger.out('Add record: {}'.format(name), state='d', prefix='dns-aggregator')
try:
sql_curs.execute(
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
(domain_id, r_name, r_ttl, r_type, 0, r_data)
)
changed = True
except psycopg2.IntegrityError as e:
if self.config['debug']:
self.logger.out('Failed to add record due to {}: {}'.format(e, name), state='d', prefix='dns-aggregator')
except psycopg2.errors.InFailedSqlTransaction as e:
if self.config['debug']:
self.logger.out('Failed to add record due to {}: {}'.format(e, name), state='d', prefix='dns-aggregator')
if changed:
# Increase SOA serial
sql_curs.execute(
"SELECT content FROM records WHERE domain_id=%s AND type='SOA'",
(domain_id,)
)
soa_record = list(sql_curs.fetchone())[0].split()
current_serial = int(soa_record[2])
new_serial = current_serial + 1
soa_record[2] = str(new_serial)
if self.config['debug']:
self.logger.out('Records changed; bumping SOA: {}'.format(new_serial), state='d', prefix='dns-aggregator')
sql_curs.execute(
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
(' '.join(soa_record), domain_id)
)
# Commit all the previous changes
if self.config['debug']:
self.logger.out('Committing database changes and reloading PDNS', state='d', prefix='dns-aggregator')
try:
self.sql_conn.commit()
except Exception as e:
self.logger.out('ERROR: Failed to commit DNS aggregator changes: {}'.format(e), state='e')
# Reload the domain
common.run_os_command(
'/usr/bin/pdns_control --socket-dir={} reload {}'.format(
self.config['pdns_dynamic_directory'],
domain
),
background=False
)
# Wait for 10 seconds
time.sleep(10)

View File

@ -0,0 +1,193 @@
#!/usr/bin/env python3
# MetadataAPIInstance.py - Class implementing an EC2-compatible cloud-init Metadata server
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import gevent.pywsgi
import flask
import sys
import time
import psycopg2
from threading import Thread
from psycopg2.extras import RealDictCursor
import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network
class MetadataAPIInstance(object):
mdapi = flask.Flask(__name__)
# Initialization function
def __init__(self, zkhandler, config, logger):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.thread = None
self.md_http_server = None
self.add_routes()
# Add flask routes inside our instance
def add_routes(self):
@self.mdapi.route('/', methods=['GET'])
def api_root():
return flask.jsonify({"message": "PVC Provisioner Metadata API version 1"}), 209
@self.mdapi.route('/<version>/meta-data/', methods=['GET'])
def api_metadata_root(version):
metadata = """instance-id\nname\nprofile"""
return metadata, 200
@self.mdapi.route('/<version>/meta-data/instance-id', methods=['GET'])
def api_metadata_instanceid(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
instance_id = vm_details.get('uuid', None)
return instance_id, 200
@self.mdapi.route('/<version>/meta-data/name', methods=['GET'])
def api_metadata_hostname(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
vm_name = vm_details.get('name', None)
return vm_name, 200
@self.mdapi.route('/<version>/meta-data/profile', methods=['GET'])
def api_metadata_profile(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
vm_profile = vm_details.get('profile', None)
return vm_profile, 200
@self.mdapi.route('/<version>/user-data', methods=['GET'])
def api_userdata(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
vm_profile = vm_details.get('profile', None)
# Get the userdata
if vm_profile:
userdata = self.get_profile_userdata(vm_profile)
self.logger.out("Returning userdata for profile {}".format(vm_profile), state='i', prefix='Metadata API')
else:
userdata = None
return flask.Response(userdata)
def launch_wsgi(self):
try:
self.md_http_server = gevent.pywsgi.WSGIServer(
('169.254.169.254', 80),
self.mdapi,
log=sys.stdout,
error_log=sys.stdout
)
self.md_http_server.serve_forever()
except Exception as e:
self.logger.out('Error starting Metadata API: {}'.format(e), state='e')
# WSGI start/stop
def start(self):
# Launch Metadata API
self.logger.out('Starting Metadata API at 169.254.169.254:80', state='i')
self.thread = Thread(target=self.launch_wsgi)
self.thread.start()
self.logger.out('Successfully started Metadata API thread', state='o')
def stop(self):
if not self.md_http_server:
return
self.logger.out('Stopping Metadata API at 169.254.169.254:80', state='i')
try:
self.md_http_server.stop()
time.sleep(0.1)
self.md_http_server.close()
time.sleep(0.1)
self.md_http_server = None
self.logger.out('Successfully stopped Metadata API', state='o')
except Exception as e:
self.logger.out('Error stopping Metadata API: {}'.format(e), state='e')
# Helper functions
def open_database(self):
conn = psycopg2.connect(
host=self.config['metadata_postgresql_host'],
port=self.config['metadata_postgresql_port'],
dbname=self.config['metadata_postgresql_dbname'],
user=self.config['metadata_postgresql_user'],
password=self.config['metadata_postgresql_password']
)
cur = conn.cursor(cursor_factory=RealDictCursor)
return conn, cur
def close_database(self, conn, cur):
cur.close()
conn.close()
# Obtain a list of templates
def get_profile_userdata(self, vm_profile):
query = """SELECT userdata.userdata FROM profile
JOIN userdata ON profile.userdata = userdata.id
WHERE profile.name = %s;
"""
args = (vm_profile,)
conn, cur = self.open_database()
cur.execute(query, args)
data_raw = cur.fetchone()
self.close_database(conn, cur)
if data_raw is not None:
data = data_raw.get('userdata', None)
return data
else:
return None
# VM details function
def get_vm_details(self, source_address):
# Start connection to Zookeeper
_discard, networks = pvc_network.get_list(self.zkhandler, None)
# Figure out which server this is via the DHCP address
host_information = dict()
networks_managed = (x for x in networks if x.get('type') == 'managed')
for network in networks_managed:
network_leases = pvc_network.getNetworkDHCPLeases(self.zkhandler, network.get('vni'))
for network_lease in network_leases:
information = pvc_network.getDHCPLeaseInformation(self.zkhandler, network.get('vni'), network_lease)
try:
if information.get('ip4_address', None) == source_address:
host_information = information
except Exception:
pass
# Get our real information on the host; now we can start querying about it
client_macaddr = host_information.get('mac_address', None)
# Find the VM with that MAC address - we can't assume that the hostname is actually right
_discard, vm_list = pvc_vm.get_list(self.zkhandler, None, None, None, None)
vm_details = dict()
for vm in vm_list:
try:
for network in vm.get('networks'):
if network.get('mac', None) == client_macaddr:
vm_details = vm
except Exception:
pass
return vm_details

View File

@ -0,0 +1,777 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
from threading import Thread
import daemon_lib.common as common
class NodeInstance(object):
# Initialization function
def __init__(self, name, this_node, zkhandler, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api):
# Passed-in variables on creation
self.name = name
self.this_node = this_node
self.zkhandler = zkhandler
self.config = config
self.logger = logger
# Which node is primary
self.primary_node = None
# States
self.daemon_mode = self.zkhandler.read(('node.mode', self.name))
self.daemon_state = 'stop'
self.router_state = 'client'
self.domain_state = 'ready'
# Object lists
self.d_node = d_node
self.d_network = d_network
self.d_domain = d_domain
self.dns_aggregator = dns_aggregator
self.metadata_api = metadata_api
# Printable lists
self.active_node_list = []
self.flushed_node_list = []
self.inactive_node_list = []
self.network_list = []
self.domain_list = []
# Node resources
self.domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Floating IP configurations
if self.config['enable_networking']:
self.upstream_dev = self.config['upstream_dev']
self.upstream_floatingipaddr = self.config['upstream_floating_ip'].split('/')[0]
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_dev_ip'].split('/')
self.cluster_dev = self.config['cluster_dev']
self.cluster_floatingipaddr = self.config['cluster_floating_ip'].split('/')[0]
self.cluster_ipaddr, self.cluster_cidrnetmask = self.config['cluster_dev_ip'].split('/')
self.storage_dev = self.config['storage_dev']
self.storage_floatingipaddr = self.config['storage_floating_ip'].split('/')[0]
self.storage_ipaddr, self.storage_cidrnetmask = self.config['storage_dev_ip'].split('/')
else:
self.upstream_dev = None
self.upstream_floatingipaddr = None
self.upstream_ipaddr = None
self.upstream_cidrnetmask = None
self.cluster_dev = None
self.cluster_floatingipaddr = None
self.cluster_ipaddr = None
self.cluster_cidrnetmask = None
self.storage_dev = None
self.storage_floatingipaddr = None
self.storage_ipaddr = None
self.storage_cidrnetmask = None
# Threads
self.flush_thread = None
# Flags
self.flush_stopper = False
# Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.state.daemon', self.name))
def watch_node_daemonstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'stop'
if data != self.daemon_state:
self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.state.router', self.name))
def watch_node_routerstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'client'
if self.name == self.this_node and self.daemon_mode == 'coordinator':
# We're a coordinator so we care about networking
if data != self.router_state:
self.router_state = data
if self.config['enable_networking']:
if self.router_state == 'takeover':
self.logger.out('Setting node {} to primary state'.format(self.name), state='i')
transition_thread = Thread(target=self.become_primary, args=(), kwargs={})
transition_thread.start()
if self.router_state == 'relinquish':
# Skip becoming secondary unless already running
if self.daemon_state == 'run' or self.daemon_state == 'shutdown':
self.logger.out('Setting node {} to secondary state'.format(self.name), state='i')
transition_thread = Thread(target=self.become_secondary, args=(), kwargs={})
transition_thread.start()
else:
# We did nothing, so just become secondary state
self.zkhandler.write([
(('node.state.router', self.name), 'secondary')
])
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.state.domain', self.name))
def watch_node_domainstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'unknown'
if data != self.domain_state:
self.domain_state = data
# toggle state management of this node
if self.name == self.this_node:
# Stop any existing flush jobs
if self.flush_thread is not None:
self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i')
self.flush_stopper = True
while self.flush_stopper:
time.sleep(0.1)
# Do flushing in a thread so it doesn't block the migrates out
if self.domain_state == 'flush':
self.flush_thread = Thread(target=self.flush, args=(), kwargs={})
self.flush_thread.start()
# Do unflushing in a thread so it doesn't block the migrates in
if self.domain_state == 'unflush':
self.flush_thread = Thread(target=self.unflush, args=(), kwargs={})
self.flush_thread.start()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.memory.free', self.name))
def watch_node_memfree(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.memory.used', self.name))
def watch_node_memused(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.memory.allocated', self.name))
def watch_node_memalloc(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.vcpu.allocated', self.name))
def watch_node_vcpualloc(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.running_domains', self.name))
def watch_node_runningdomains(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii').split()
except AttributeError:
data = []
if data != self.domain_list:
self.domain_list = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.count.provisioned_domains', self.name))
def watch_node_domainscount(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.domains_count:
self.domains_count = data
# Update value functions
def update_node_list(self, d_node):
self.d_node = d_node
def update_network_list(self, d_network):
self.d_network = d_network
network_list = []
for network in self.d_network:
network_list.append(d_network[network].vni)
self.network_list = network_list
def update_domain_list(self, d_domain):
self.d_domain = d_domain
######
# Phases of node transition
#
# Current Primary Candidate Secondary
# -> secondary -> primary
#
# def become_secondary() def become_primary()
#
# A ----------------------------------------------------------------- SYNC (candidate)
# B ----------------------------------------------------------------- SYNC (current)
# 1. Stop DNS aggregator ||
# 2. Stop DHCP servers ||
# 4a) network 1 ||
# 4b) network 2 ||
# etc. ||
# 3. Stop client API ||
# 4. Stop metadata API ||
# --
# C ----------------------------------------------------------------- SYNC (candidate)
# 5. Remove upstream floating IP 1. Add upstream floating IP ||
# --
# D ----------------------------------------------------------------- SYNC (candidate)
# 6. Remove cluster floating IP 2. Add cluster floating IP ||
# --
# E ----------------------------------------------------------------- SYNC (candidate)
# 7. Remove metadata floating IP 3. Add metadata floating IP ||
# --
# F ----------------------------------------------------------------- SYNC (candidate)
# 8. Remove gateway IPs 4. Add gateway IPs ||
# 8a) network 1 4a) network 1 ||
# 8b) network 2 4b) network 2 ||
# etc. etc. ||
# --
# G ----------------------------------------------------------------- SYNC (candidate)
# 5. Transition Patroni primary ||
# 6. Start client API ||
# 7. Start metadata API ||
# 8. Start DHCP servers ||
# 5a) network 1 ||
# 5b) network 2 ||
# etc. ||
# 9. Start DNS aggregator ||
# --
######
def become_primary(self):
"""
Acquire primary coordinator status from a peer node
"""
# Lock the primary node until transition is complete
primary_lock = self.zkhandler.exclusivelock('base.config.primary_node')
primary_lock.acquire()
# Ensure our lock key is populated
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
# Synchronize nodes A (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase A', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase A', state='o')
time.sleep(1) # Time fir reader to acquire the lock
self.logger.out('Releasing write lock for synchronization phase A', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase A', state='o')
time.sleep(0.1) # Time fir new writer to acquire the lock
# Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase B', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase B', state='o')
self.logger.out('Releasing read lock for synchronization phase B', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase B', state='o')
# Synchronize nodes C (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase C', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase C', state='o')
time.sleep(0.5) # Time fir reader to acquire the lock
# 1. Add Upstream floating IP
self.logger.out(
'Creating floating upstream IP {}/{} on interface {}'.format(
self.upstream_floatingipaddr,
self.upstream_cidrnetmask,
'brupstream'
),
state='o'
)
common.createIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing write lock for synchronization phase C', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase C', state='o')
# Synchronize nodes D (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase D', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o')
time.sleep(0.2) # Time fir reader to acquire the lock
# 2. Add Cluster & Storage floating IP
self.logger.out(
'Creating floating management IP {}/{} on interface {}'.format(
self.cluster_floatingipaddr,
self.cluster_cidrnetmask,
'brcluster'
),
state='o'
)
common.createIPAddress(self.cluster_floatingipaddr, self.cluster_cidrnetmask, 'brcluster')
self.logger.out(
'Creating floating storage IP {}/{} on interface {}'.format(
self.storage_floatingipaddr,
self.storage_cidrnetmask,
'brstorage'
),
state='o'
)
common.createIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
self.logger.out('Releasing write lock for synchronization phase D', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase D', state='o')
# Synchronize nodes E (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase E', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase E', state='o')
time.sleep(0.2) # Time fir reader to acquire the lock
# 3. Add Metadata link-local IP
self.logger.out(
'Creating Metadata link-local IP {}/{} on interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.createIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing write lock for synchronization phase E', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase E', state='o')
# Synchronize nodes F (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase F', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase F', state='o')
time.sleep(0.2) # Time fir reader to acquire the lock
# 4. Add gateway IPs
for network in self.d_network:
self.d_network[network].createGateways()
self.logger.out('Releasing write lock for synchronization phase F', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase F', state='o')
# Synchronize nodes G (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase G', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase G', state='o')
time.sleep(0.2) # Time fir reader to acquire the lock
# 5. Transition Patroni primary
self.logger.out('Setting Patroni leader to this node', state='i')
tick = 1
patroni_failed = True
# As long as we're in takeover, keep trying to set the Patroni leader to us
while self.router_state == 'takeover':
# Switch Patroni leader to the local instance
retcode, stdout, stderr = common.run_os_command(
"""
patronictl
-c /etc/patroni/config.yml
switchover
--candidate {}
--force
pvc
""".format(self.name)
)
# Combine the stdout and stderr and strip the output
# Patronictl's output is pretty junky
if stderr:
stdout += stderr
stdout = stdout.strip()
# Handle our current Patroni leader being us
if stdout and stdout.split('\n')[-1].split() == ["Error:", "Switchover", "target", "and", "source", "are", "the", "same."]:
self.logger.out('Failed to switch Patroni leader to ourselves; this is fine\n{}'.format(stdout), state='w')
patroni_failed = False
break
# Handle a failed switchover
elif stdout and (stdout.split('\n')[-1].split()[:2] == ["Switchover", "failed,"] or stdout.strip().split('\n')[-1].split()[:1] == ["Error"]):
if tick > 4:
self.logger.out('Failed to switch Patroni leader after 5 tries; aborting', state='e')
break
else:
self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}\n'.format(tick, stdout), state='e')
tick += 1
time.sleep(5)
# Otherwise, we succeeded
else:
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
patroni_failed = False
time.sleep(0.2)
break
# 6. Start client API (and provisioner worker)
if self.config['enable_api']:
self.logger.out('Starting PVC API client service', state='i')
common.run_os_command("systemctl enable pvcapid.service")
common.run_os_command("systemctl start pvcapid.service")
self.logger.out('Starting PVC Provisioner Worker service', state='i')
common.run_os_command("systemctl start pvcapid-worker.service")
# 7. Start metadata API; just continue if we fail
self.metadata_api.start()
# 8. Start DHCP servers
for network in self.d_network:
self.d_network[network].startDHCPServer()
# 9. Start DNS aggregator; just continue if we fail
if not patroni_failed:
self.dns_aggregator.start_aggregator()
else:
self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e')
self.logger.out('Releasing write lock for synchronization phase G', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase G', state='o')
# Wait 2 seconds for everything to stabilize before we declare all-done
time.sleep(2)
primary_lock.release()
self.zkhandler.write([
(('node.state.router', self.name), 'primary')
])
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
def become_secondary(self):
"""
Relinquish primary coordinator status to a peer node
"""
time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase A', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase A', state='o')
self.logger.out('Releasing read lock for synchronization phase A', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase A', state='o')
# Synchronize nodes B (I am writer)
lock = self.zkhandler.writelock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring write lock for synchronization phase B', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase B', state='o')
time.sleep(0.2) # Time fir reader to acquire the lock
# 1. Stop DNS aggregator
self.dns_aggregator.stop_aggregator()
# 2. Stop DHCP servers
for network in self.d_network:
self.d_network[network].stopDHCPServer()
self.logger.out('Releasing write lock for synchronization phase B', state='i')
self.zkhandler.write([
('base.config.primary_node.sync_lock', '')
])
lock.release()
self.logger.out('Released write lock for synchronization phase B', state='o')
# 3. Stop client API
if self.config['enable_api']:
self.logger.out('Stopping PVC API client service', state='i')
common.run_os_command("systemctl stop pvcapid.service")
common.run_os_command("systemctl disable pvcapid.service")
# 4. Stop metadata API
self.metadata_api.stop()
time.sleep(0.1) # Time fir new writer to acquire the lock
# Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase C', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase C', state='o')
# 5. Remove Upstream floating IP
self.logger.out(
'Removing floating upstream IP {}/{} from interface {}'.format(
self.upstream_floatingipaddr,
self.upstream_cidrnetmask,
'brupstream'
),
state='o'
)
common.removeIPAddress(self.upstream_floatingipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing read lock for synchronization phase C', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase C', state='o')
# Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase D', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o')
# 6. Remove Cluster & Storage floating IP
self.logger.out(
'Removing floating management IP {}/{} from interface {}'.format(
self.cluster_floatingipaddr,
self.cluster_cidrnetmask,
'brcluster'
),
state='o'
)
common.removeIPAddress(self.cluster_floatingipaddr, self.cluster_cidrnetmask, 'brcluster')
self.logger.out(
'Removing floating storage IP {}/{} from interface {}'.format(
self.storage_floatingipaddr,
self.storage_cidrnetmask,
'brstorage'
),
state='o'
)
common.removeIPAddress(self.storage_floatingipaddr, self.storage_cidrnetmask, 'brstorage')
self.logger.out('Releasing read lock for synchronization phase D', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase D', state='o')
# Synchronize nodes E (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase E', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase E', state='o')
# 7. Remove Metadata link-local IP
self.logger.out(
'Removing Metadata link-local IP {}/{} from interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.removeIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing read lock for synchronization phase E', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase E', state='o')
# Synchronize nodes F (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase F', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase F', state='o')
# 8. Remove gateway IPs
for network in self.d_network:
self.d_network[network].removeGateways()
self.logger.out('Releasing read lock for synchronization phase F', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase F', state='o')
# Synchronize nodes G (I am reader)
lock = self.zkhandler.readlock('base.config.primary_node.sync_lock')
self.logger.out('Acquiring read lock for synchronization phase G', state='i')
try:
lock.acquire(timeout=60) # Don't wait forever and completely block us
self.logger.out('Acquired read lock for synchronization phase G', state='o')
except Exception:
pass
self.logger.out('Releasing read lock for synchronization phase G', state='i')
lock.release()
self.logger.out('Released read lock for synchronization phase G', state='o')
# Wait 2 seconds for everything to stabilize before we declare all-done
time.sleep(2)
self.zkhandler.write([
(('node.state.router', self.name), 'secondary')
])
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
# Flush all VMs on the host
def flush(self):
# Begin flush
self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i')
self.logger.out('VM list: {}'.format(', '.join(self.domain_list)), state='i')
fixed_domain_list = self.domain_list.copy()
for dom_uuid in fixed_domain_list:
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node flush'.format(self.name), state='i')
self.flush_thread = None
self.flush_stopper = False
return
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
# Don't replace the previous node if the VM is already migrated
if self.zkhandler.read(('domain.last_node', dom_uuid)):
current_node = self.zkhandler.read(('domain.last_node', dom_uuid))
else:
current_node = self.zkhandler.read(('domain.node', dom_uuid))
target_node = common.findTargetNode(self.zkhandler, dom_uuid)
if target_node == current_node:
target_node = None
if target_node is None:
self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e')
self.zkhandler.write([
(('domain.state', dom_uuid), 'shutdown'),
(('domain.meta.autostart', dom_uuid), 'True'),
])
else:
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
self.zkhandler.write([
(('domain.state', dom_uuid), 'migrate'),
(('domain.node', dom_uuid), target_node),
(('domain.last_node', dom_uuid), current_node),
])
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
ticks = 0
while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
ticks += 1
if ticks > 600:
# Abort if we've waited for 120 seconds, the VM is messed and just continue
break
time.sleep(0.2)
self.zkhandler.write([
(('node.running_domains', self.name), ''),
(('node.state.domain', self.name), 'flushed'),
])
self.flush_thread = None
self.flush_stopper = False
return
def unflush(self):
self.logger.out('Restoring node {} to active service.'.format(self.name), state='i')
fixed_domain_list = self.d_domain.copy()
for dom_uuid in fixed_domain_list:
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node unflush'.format(self.name), state='i')
self.flush_thread = None
self.flush_stopper = False
return
# Handle autostarts
autostart = self.zkhandler.read(('domain.meta.autostart', dom_uuid))
node = self.zkhandler.read(('domain.node', dom_uuid))
if autostart == 'True' and node == self.name:
self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i')
self.zkhandler.write([
(('domain.state', dom_uuid), 'start'),
(('domain.node', dom_uuid), self.name),
(('domain.last_node', dom_uuid), ''),
(('domain.meta.autostart', dom_uuid), 'False'),
])
continue
try:
last_node = self.zkhandler.read(('domain.last_node', dom_uuid))
except Exception:
continue
if last_node != self.name:
continue
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
self.zkhandler.write([
(('domain.state', dom_uuid), 'migrate'),
(('domain.node', dom_uuid), self.name),
(('domain.last_node', dom_uuid), ''),
])
# Wait for the VM to migrate back
while self.zkhandler.read(('domain.state', dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
time.sleep(0.1)
self.zkhandler.write([
(('node.state.domain', self.name), 'ready')
])
self.flush_thread = None
self.flush_stopper = False
return

View File

@ -0,0 +1,210 @@
#!/usr/bin/env python3
# SRIOVVFInstance.py - Class implementing a PVC SR-IOV VF and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
def boolToOnOff(state):
if state and str(state) == 'True':
return 'on'
else:
return 'off'
class SRIOVVFInstance(object):
# Initialization function
def __init__(self, vf, zkhandler, config, logger, this_node):
self.vf = vf
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.myhostname = self.this_node.name
self.pf = self.zkhandler.read(('node.sriov.vf', self.myhostname, 'sriov_vf.pf', self.vf))
self.mtu = self.zkhandler.read(('node.sriov.vf', self.myhostname, 'sriov_vf.mtu', self.vf))
self.vfid = self.vf.replace('{}v'.format(self.pf), '')
self.logger.out('Setting MTU to {}'.format(self.mtu), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} mtu {}'.format(self.vf, self.mtu))
# These properties are set via the DataWatch functions, to ensure they are configured on the system
self.mac = None
self.vlan_id = None
self.vlan_qos = None
self.tx_rate_min = None
self.tx_rate_max = None
self.spoof_check = None
self.link_state = None
self.trust = None
self.query_rss = None
# Zookeeper handlers for changed configs
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.mac', self.vf))
def watch_vf_mac(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = '00:00:00:00:00:00'
if data != self.mac:
self.mac = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.vlan_id', self.vf))
def watch_vf_vlan_id(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = '0'
if data != self.vlan_id:
self.vlan_id = data
self.logger.out('Setting vLAN ID to {}'.format(self.vlan_id), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} vlan {} qos {}'.format(self.pf, self.vfid, self.vlan_id, self.vlan_qos))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.vlan_qos', self.vf))
def watch_vf_vlan_qos(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = '0'
if data != self.vlan_qos:
self.vlan_qos = data
self.logger.out('Setting vLAN QOS to {}'.format(self.vlan_qos), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} vlan {} qos {}'.format(self.pf, self.vfid, self.vlan_id, self.vlan_qos))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.tx_rate_min', self.vf))
def watch_vf_tx_rate_min(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = '0'
if data != self.tx_rate_min:
self.tx_rate_min = data
self.logger.out('Setting minimum TX rate to {}'.format(self.tx_rate_min), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} min_tx_rate {}'.format(self.pf, self.vfid, self.tx_rate_min))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.tx_rate_max', self.vf))
def watch_vf_tx_rate_max(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; termaxate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = '0'
if data != self.tx_rate_max:
self.tx_rate_max = data
self.logger.out('Setting maximum TX rate to {}'.format(self.tx_rate_max), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} max_tx_rate {}'.format(self.pf, self.vfid, self.tx_rate_max))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.spoof_check', self.vf))
def watch_vf_spoof_check(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = '0'
if data != self.spoof_check:
self.spoof_check = data
self.logger.out('Setting spoof checking {}'.format(boolToOnOff(self.spoof_check)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} spoofchk {}'.format(self.pf, self.vfid, boolToOnOff(self.spoof_check)))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.link_state', self.vf))
def watch_vf_link_state(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'on'
if data != self.link_state:
self.link_state = data
self.logger.out('Setting link state to {}'.format(boolToOnOff(self.link_state)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} state {}'.format(self.pf, self.vfid, self.link_state))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.trust', self.vf))
def watch_vf_trust(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'off'
if data != self.trust:
self.trust = data
self.logger.out('Setting trust mode {}'.format(boolToOnOff(self.trust)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} trust {}'.format(self.pf, self.vfid, boolToOnOff(self.trust)))
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.query_rss', self.vf))
def watch_vf_query_rss(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'off'
if data != self.query_rss:
self.query_rss = data
self.logger.out('Setting RSS query ability {}'.format(boolToOnOff(self.query_rss)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
common.run_os_command('ip link set {} vf {} query_rss {}'.format(self.pf, self.vfid, boolToOnOff(self.query_rss)))

View File

@ -0,0 +1,101 @@
#!/usr/bin/env python3
# VMConsoleWatcherInstance.py - Class implementing a console log watcher for PVC domains
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import time
from threading import Thread, Event
from collections import deque
class VMConsoleWatcherInstance(object):
# Initialization function
def __init__(self, domuuid, domname, zkhandler, config, logger, this_node):
self.domuuid = domuuid
self.domname = domname
self.zkhandler = zkhandler
self.config = config
self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname)
self.console_log_lines = config['console_log_lines']
self.logger = logger
self.this_node = this_node
# Try to append (create) the logfile and set its permissions
open(self.logfile, 'a').close()
os.chmod(self.logfile, 0o600)
try:
self.logdeque = deque(open(self.logfile), self.console_log_lines)
except UnicodeDecodeError:
# There is corruption in the log file; overwrite it
self.logger.out('Failed to decode console log file; clearing existing file', state='w', prefix='Domain {}'.format(self.domuuid))
with open(self.logfile, 'w') as lfh:
lfh.write('\n')
self.logdeque = deque(open(self.logfile), self.console_log_lines)
self.stamp = None
self.cached_stamp = None
# Set up the deque with the current contents of the log
self.last_loglines = None
self.loglines = None
# Thread options
self.thread = None
self.thread_stopper = Event()
# Start execution thread
def start(self):
self.thread_stopper.clear()
self.thread = Thread(target=self.run, args=(), kwargs={})
self.logger.out('Starting VM log parser', state='i', prefix='Domain {}'.format(self.domuuid))
self.thread.start()
# Stop execution thread
def stop(self):
if self.thread and self.thread.is_alive():
self.logger.out('Stopping VM log parser', state='i', prefix='Domain {}'.format(self.domuuid))
self.thread_stopper.set()
# Do one final flush
self.update()
# Main entrypoint
def run(self):
# Main loop
while not self.thread_stopper.is_set():
self.update()
time.sleep(0.5)
def update(self):
self.stamp = os.stat(self.logfile).st_mtime
if self.stamp != self.cached_stamp:
self.cached_stamp = self.stamp
self.fetch_lines()
# Update Zookeeper with the new loglines if they changed
if self.loglines != self.last_loglines:
self.zkhandler.write([
(('domain.console.log', self.domuuid), self.loglines)
])
self.last_loglines = self.loglines
def fetch_lines(self):
self.logdeque = deque(open(self.logfile), self.console_log_lines)
self.loglines = ''.join(self.logdeque)

View File

@ -0,0 +1,880 @@
#!/usr/bin/env python3
# VMInstance.py - Class implementing a PVC virtual machine in pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import uuid
import time
import libvirt
import json
from threading import Thread
from xml.etree import ElementTree
import daemon_lib.common as common
import pvcnoded.objects.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import daemon_lib.common as daemon_common
class VMInstance(object):
# Initialization function
def __init__(self, domuuid, zkhandler, config, logger, this_node):
# Passed-in variables on creation
self.domuuid = domuuid
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
# Get data from zookeeper
self.domname = self.zkhandler.read(('domain', domuuid))
self.state = self.zkhandler.read(('domain.state', domuuid))
self.node = self.zkhandler.read(('domain.node', domuuid))
self.lastnode = self.zkhandler.read(('domain.last_node', domuuid))
self.last_currentnode = self.zkhandler.read(('domain.node', domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', domuuid))
try:
self.migration_method = self.zkhandler.read(('domain.meta.migrate_method', self.domuuid))
except Exception:
self.migration_method = 'none'
# These will all be set later
self.instart = False
self.inrestart = False
self.inmigrate = False
self.inreceive = False
self.inshutdown = False
self.instop = False
# Libvirt domuuid
self.dom = self.lookupByUUID(self.domuuid)
# Log watcher instance
self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zkhandler, self.config, self.logger, self.this_node)
# Watch for changes to the state field in Zookeeper
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('domain.state', self.domuuid))
def watch_state(data, stat, event=""):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Perform a management command
self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i')
state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
state_thread.start()
# Get data functions
def getstate(self):
return self.state
def getnode(self):
return self.node
def getlastnode(self):
return self.lastnode
def getdom(self):
return self.dom
def getmemory(self):
try:
if self.dom is not None:
memory = int(self.dom.info()[2] / 1024)
else:
domain_information = daemon_common.getInformationFromXML(self.zkhandler, self.domuuid)
memory = int(domain_information['memory'])
except Exception:
memory = 0
return memory
def getvcpus(self):
try:
vcpus = int(self.dom.info()[3])
except Exception:
vcpus = 0
return vcpus
# Manage local node domain_list
def addDomainToList(self):
if self.domuuid not in self.this_node.domain_list:
try:
# Add the domain to the domain_list array
self.this_node.domain_list.append(self.domuuid)
# Push the change up to Zookeeper
self.zkhandler.write([
(('node.running_domains', self.this_node.name), ' '.join(self.this_node.domain_list))
])
except Exception as e:
self.logger.out('Error adding domain to list: {}'.format(e), state='e')
def removeDomainFromList(self):
if self.domuuid in self.this_node.domain_list:
try:
# Remove the domain from the domain_list array
self.this_node.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper
self.zkhandler.write([
(('node.running_domains', self.this_node.name), ' '.join(self.this_node.domain_list))
])
except Exception as e:
self.logger.out('Error removing domain from list: {}'.format(e), state='e')
# Update the VNC live data
def update_vnc(self):
if self.dom is not None:
live_xml = ElementTree.fromstring(self.dom.XMLDesc(0))
graphics = live_xml.find('./devices/graphics')
if graphics is not None:
self.logger.out('Updating VNC data', state='i', prefix='Domain {}'.format(self.domuuid))
port = graphics.get('port', '')
listen = graphics.get('listen', '')
self.zkhandler.write([
(('domain.console.vnc', self.domuuid), '{}:{}'.format(listen, port))
])
else:
self.zkhandler.write([
(('domain.console.vnc', self.domuuid), '')
])
else:
self.zkhandler.write([
(('domain.console.vnc', self.domuuid), '')
])
# Start up the VM
def start_vm(self):
# Start the log watcher
self.console_log_instance.start()
self.logger.out('Starting VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
self.instart = False
return
# Try to get the current state in case it's already running
try:
self.dom = self.lookupByUUID(self.domuuid)
curstate = self.dom.state()[0]
except Exception:
curstate = 'notstart'
# Handle situations where the VM crashed or the node unexpectedly rebooted
if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
# Flush locks
self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid))
VMInstance.flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node)
if self.zkhandler.read(('domain.state', self.domuuid)) == 'fail':
lv_conn.close()
self.dom = None
self.instart = False
return
if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model
self.addDomainToList()
self.zkhandler.write([
(('domain.failed_reason', self.domuuid), '')
])
else:
# Or try to create it
try:
# Grab the domain information from Zookeeper
xmlconfig = self.zkhandler.read(('domain.xml', self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList()
self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = dom
self.zkhandler.write([
(('domain.failed_reason', self.domuuid), '')
])
except libvirt.libvirtError as e:
self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.zkhandler.write([
(('domain.state', self.domuuid), 'fail'),
(('domain.failed_reason', self.domuuid), str(e))
])
lv_conn.close()
self.dom = None
self.instart = False
return
lv_conn.close()
self.instart = False
# Restart the VM
def restart_vm(self):
self.logger.out('Restarting VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.inrestart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
self.inrestart = False
return
self.shutdown_vm()
time.sleep(0.2)
self.start_vm()
self.addDomainToList()
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
lv_conn.close()
self.inrestart = False
# Stop the VM forcibly without updating state
def terminate_vm(self):
self.logger.out('Terminating VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
time.sleep(0.2)
try:
if self.getdom().state()[0] == libvirt.VIR_DOMAIN_RUNNING:
# It didn't terminate, try again
self.dom.destroy()
except libvirt.libvirtError:
pass
except AttributeError:
self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.removeDomainFromList()
self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Stop the VM forcibly
def stop_vm(self):
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
time.sleep(0.2)
try:
if self.getdom().state()[0] == libvirt.VIR_DOMAIN_RUNNING:
# It didn't terminate, try again
self.dom.destroy()
except libvirt.libvirtError:
pass
except AttributeError:
self.logger.out('Failed to stop VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.removeDomainFromList()
if self.inrestart is False:
self.zkhandler.write([
(('domain.state', self.domuuid), 'stop')
])
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Shutdown the VM gracefully
def shutdown_vm(self):
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = False
self.inshutdown = True
self.dom.shutdown()
tick = 0
while True:
tick += 1
time.sleep(1)
# Abort shutdown if the state changes to start
current_state = self.zkhandler.read(('domain.state', self.domuuid))
if current_state not in ['shutdown', 'restart', 'migrate']:
self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = True
break
try:
lvdomstate = self.dom.state()[0]
except Exception:
lvdomstate = None
if lvdomstate != libvirt.VIR_DOMAIN_RUNNING:
self.removeDomainFromList()
self.zkhandler.write([
(('domain.state', self.domuuid), 'stop')
])
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
# Stop the log watcher
self.console_log_instance.stop()
break
if tick >= self.config['vm_shutdown_timeout']:
self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid))
self.zkhandler.write([
(('domain.state', self.domuuid), 'stop')
])
break
self.inshutdown = False
if is_aborted:
self.manage_vm_state()
if self.inrestart:
# Wait to prevent race conditions
time.sleep(1)
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
# Migrate the VM to a target host
def migrate_vm(self, force_live=False, force_shutdown=False):
# Wait for any previous migration
while self.inmigrate:
time.sleep(0.1)
if self.migration_method == 'live':
force_live = True
elif self.migration_method == 'shutdown':
force_shutdown = True
self.inmigrate = True
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
# Used for sanity checking later
target_node = self.zkhandler.read(('domain.node', self.domuuid))
aborted = False
def abort_migrate(reason):
self.zkhandler.write([
(('domain.state', self.domuuid), 'start'),
(('domain.node', self.domuuid), self.this_node.name),
(('domain.last_node', self.domuuid), self.last_lastnode)
])
migrate_lock_node.release()
migrate_lock_state.release()
self.inmigrate = False
self.logger.out('Aborted migration: {}'.format(reason), state='i', prefix='Domain {}'.format(self.domuuid))
# Acquire exclusive lock on the domain node key
migrate_lock_node = self.zkhandler.exclusivelock(('domain.node', self.domuuid))
migrate_lock_state = self.zkhandler.exclusivelock(('domain.state', self.domuuid))
migrate_lock_node.acquire()
migrate_lock_state.acquire()
time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Don't try to migrate a node to itself, set back to start
if self.node == self.lastnode or self.node == self.this_node.name:
abort_migrate('Target node matches the current active node during initial check')
return
# Synchronize nodes A (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
if self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid))
ticks = 0
while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
time.sleep(0.1)
ticks += 1
if ticks > 300:
self.logger.out('Timed out waiting 30s for peer', state='e', prefix='Domain {}'.format(self.domuuid))
aborted = True
break
self.logger.out('Releasing read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
if aborted:
abort_migrate('Timed out waiting for peer')
return
# Synchronize nodes B (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time for reader to acquire the lock
def migrate_live():
self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid))
# Set up destination connection
dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain'])
dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain'])
try:
self.logger.out('Opening remote libvirt connection', state='i', prefix='Domain {}'.format(self.domuuid))
# Open a connection to the destination
dest_lv_conn = libvirt.open(dest_lv)
if not dest_lv_conn:
raise
except Exception:
self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}'.format(self.domuuid))
return False
try:
self.logger.out('Live migrating VM', state='i', prefix='Domain {}'.format(self.domuuid))
# Send the live migration; force the destination URI to ensure we transit over the cluster network
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0)
if not target_dom:
raise
except Exception as e:
self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}'.format(self.domuuid))
dest_lv_conn.close()
return False
self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
dest_lv_conn.close()
self.console_log_instance.stop()
self.removeDomainFromList()
return True
def migrate_shutdown():
self.logger.out('Shutting down VM for offline migration', state='i', prefix='Domain {}'.format(self.domuuid))
self.shutdown_vm()
return True
do_migrate_shutdown = False
migrate_live_result = False
# Do a final verification
if self.node == self.lastnode or self.node == self.this_node.name:
abort_migrate('Target node matches the current active node during final check')
return
if self.node != target_node:
abort_migrate('Target node changed during preparation')
return
if not force_shutdown:
# A live migrate is attemped 3 times in succession
ticks = 0
while True:
ticks += 1
self.logger.out('Attempting live migration try {}'.format(ticks), state='i', prefix='Domain {}'.format(self.domuuid))
migrate_live_result = migrate_live()
if migrate_live_result:
break
time.sleep(0.5)
if ticks > 2:
break
else:
migrate_live_result = False
if not migrate_live_result:
if force_live:
self.logger.out('Could not live migrate VM while live migration enforced', state='e', prefix='Domain {}'.format(self.domuuid))
aborted = True
else:
do_migrate_shutdown = True
self.logger.out('Releasing write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
if aborted:
abort_migrate('Live migration failed and is required')
return
# Synchronize nodes C (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time for reader to acquire the lock
if do_migrate_shutdown:
migrate_shutdown()
self.logger.out('Releasing write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
# Wait for the receive side to complete before we declare all-done and release locks
ticks = 0
while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) != '':
time.sleep(0.1)
ticks += 1
if ticks > 100:
self.logger.out('Sync lock clear exceeded 10s timeout, continuing', state='w', prefix='Domain {}'.format(self.domuuid))
break
migrate_lock_node.release()
migrate_lock_state.release()
self.inmigrate = False
return
# Receive the migration from another host
def receive_migrate(self):
# Wait for any previous migration
while self.inreceive:
time.sleep(0.1)
self.inreceive = True
self.logger.out('Receiving VM migration from node "{}"'.format(self.last_currentnode), state='i', prefix='Domain {}'.format(self.domuuid))
# Short delay to ensure sender is in sync
time.sleep(0.5)
# Ensure our lock key is populated
self.zkhandler.write([
(('domain.migrate.sync_lock', self.domuuid), self.domuuid)
])
# Synchronize nodes A (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(1) # Time for reader to acquire the lock
self.logger.out('Releasing write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes B (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes C (I am reader)
lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Set the updated data
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am writer)
lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5) # Time for reader to acquire the lock
self.state = self.zkhandler.read(('domain.state', self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
if self.dom:
lvdomstate = self.dom.state()[0]
if lvdomstate == libvirt.VIR_DOMAIN_RUNNING:
# VM has been received and started
self.addDomainToList()
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
else:
# The receive somehow failed
self.zkhandler.write([
(('domain.state', self.domuuid), 'fail'),
(('domain.failed_reason', self.domuuid), 'Failed to receive migration')
])
self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}'.format(self.domuuid))
else:
if self.node == self.this_node.name:
if self.state in ['start']:
# The receive was aborted
self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid))
elif self.state in ['stop']:
# The send was shutdown-based
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
else:
# The send failed or was aborted
self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
self.zkhandler.write([
(('domain.migrate.sync_lock', self.domuuid), '')
])
self.inreceive = False
return
#
# Main function to manage a VM (taking only self)
#
def manage_vm_state(self):
# Update the current values from zookeeper
self.state = self.zkhandler.read(('domain.state', self.domuuid))
self.node = self.zkhandler.read(('domain.node', self.domuuid))
self.lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.migration_method = self.zkhandler.read(('domain.meta.migrate_method', self.domuuid))
# Check the current state of the VM
try:
if self.dom is not None:
running, reason = self.dom.state()
else:
raise
except Exception:
running = libvirt.VIR_DOMAIN_NOSTATE
self.logger.out('VM state change for "{}": {} {}'.format(self.domuuid, self.state, self.node), state='i')
#######################
# Handle state changes
#######################
# Valid states are:
# start
# migrate
# migrate-live
# restart
# shutdown
# stop
# States we don't (need to) handle are:
# disable
# provision
# Conditional pass one - Are we already performing an action
if self.instart is False \
and self.inrestart is False \
and self.inmigrate is False \
and self.inreceive is False \
and self.inshutdown is False \
and self.instop is False:
# Conditional pass two - Is this VM configured to run on this node
if self.node == self.this_node.name:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM is already running and should be
if self.state == "start":
# Start the log watcher
self.console_log_instance.start()
# Add domain to running list
self.addDomainToList()
# VM is already running and should be but stuck in migrate state
elif self.state == "migrate" or self.state == "migrate-live":
# Start the log watcher
self.console_log_instance.start()
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
# Add domain to running list
self.addDomainToList()
# VM should be restarted
elif self.state == "restart":
self.restart_vm()
# VM should be shut down
elif self.state == "shutdown":
self.shutdown_vm()
# VM should be stopped
elif self.state == "stop":
self.stop_vm()
else:
# VM should be started
if self.state == "start":
# Start the domain
self.start_vm()
# VM should be migrated to this node
elif self.state == "migrate" or self.state == "migrate-live":
# Receive the migration
self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running)
if self.state == "restart":
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
# VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
# VM should be stoped; ensure it's gone from this node's domain_list
elif self.state == "stop":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
# Update the VNC information
self.update_vnc()
else:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM should be migrated away from this node
if self.state == "migrate":
self.migrate_vm(force_live=False)
# VM should be migrated away from this node, live only (no shutdown fallback)
elif self.state == "migrate-live":
self.migrate_vm(force_live=True)
# VM should be shutdown gracefully
elif self.state == 'shutdown':
self.shutdown_vm()
# VM should be forcibly terminated
else:
self.terminate_vm()
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
# 1. Takes a text UUID and handles converting it to bytes
# 2. Try's it and returns a sensible value if not
def lookupByUUID(self, tuuid):
# Don't do anything if the VM shouldn't live on this node
if self.node != self.this_node.name:
return None
lv_conn = None
libvirt_name = "qemu:///system"
# Convert the text UUID to bytes
buuid = uuid.UUID(tuuid).bytes
# Try
try:
# Open a libvirt connection
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
return None
# Lookup the UUID
dom = lv_conn.lookupByUUID(buuid)
# Fail
except Exception:
dom = None
# After everything
finally:
# Close the libvirt connection
if lv_conn is not None:
lv_conn.close()
# Return the dom object (or None)
return dom
# Flush the locks of a VM based on UUID
@staticmethod
def flush_locks(zkhandler, logger, dom_uuid, this_node=None):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images
rbd_list = zkhandler.read(('domain.storage.volumes', dom_uuid)).split(',')
for rbd in rbd_list:
# Check if a lock exists
lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd))
if lock_list_retcode != 0:
logger.out('Failed to obtain lock list for volume "{}"'.format(rbd), state='e')
continue
try:
lock_list = json.loads(lock_list_stdout)
except Exception as e:
logger.out('Failed to parse lock list for volume "{}": {}'.format(rbd, e), state='e')
continue
# If there's at least one lock
if lock_list:
# Loop through the locks
for lock in lock_list:
if this_node is not None and zkhandler.read(('domain.state', dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr:
logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e'))
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)),
])
break
# Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker']))
if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e')
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)),
])
break
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o')
return True
# Primary command function
def vm_command(zkhandler, logger, this_node, data):
# Get the command and args
command, args = data.split()
# Flushing VM RBD locks
if command == 'flush_locks':
dom_uuid = args
# Verify that the VM is set to run on this node
if this_node.d_domain[dom_uuid].getnode() == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.domain')
with zk_lock:
# Flush the lock
result = VMInstance.flush_locks(zkhandler, logger, dom_uuid, this_node)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

View File

@ -0,0 +1,847 @@
#!/usr/bin/env python3
# VXNetworkInstance.py - Class implementing a PVC VM network and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import time
from textwrap import dedent
import daemon_lib.common as common
class VXNetworkInstance(object):
# Initialization function
def __init__(self, vni, zkhandler, config, logger, this_node, dns_aggregator):
self.vni = vni
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.dns_aggregator = dns_aggregator
self.cluster_dev = config['cluster_dev']
self.cluster_mtu = config['cluster_mtu']
self.bridge_dev = config['bridge_dev']
self.nettype = self.zkhandler.read(('network.type', self.vni))
if self.nettype == 'bridged':
self.logger.out(
'Creating new bridged network',
prefix='VNI {}'.format(self.vni),
state='o'
)
self.init_bridged()
elif self.nettype == 'managed':
self.logger.out(
'Creating new managed network',
prefix='VNI {}'.format(self.vni),
state='o'
)
self.init_managed()
else:
self.logger.out(
'Invalid network type {}'.format(self.nettype),
prefix='VNI {}'.format(self.vni),
state='o'
)
pass
# Initialize a bridged network
def init_bridged(self):
self.old_description = None
self.description = None
self.vlan_nic = 'vlan{}'.format(self.vni)
self.bridge_nic = 'vmbr{}'.format(self.vni)
# Zookeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network', self.vni))
def watch_network_description(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.description != data.decode('ascii'):
self.old_description = self.description
self.description = data.decode('ascii')
self.createNetworkBridged()
# Initialize a managed network
def init_managed(self):
self.old_description = None
self.description = None
self.domain = None
self.name_servers = None
self.ip6_gateway = self.zkhandler.read(('network.ip6.gateway', self.vni))
self.ip6_network = self.zkhandler.read(('network.ip6.network', self.vni))
self.ip6_cidrnetmask = self.zkhandler.read(('network.ip6.network', self.vni)).split('/')[-1]
self.dhcp6_flag = self.zkhandler.read(('network.ip6.dhcp', self.vni))
self.ip4_gateway = self.zkhandler.read(('network.ip4.gateway', self.vni))
self.ip4_network = self.zkhandler.read(('network.ip4.network', self.vni))
self.ip4_cidrnetmask = self.zkhandler.read(('network.ip4.network', self.vni)).split('/')[-1]
self.dhcp4_flag = self.zkhandler.read(('network.ip4.dhcp', self.vni))
self.dhcp4_start = self.zkhandler.read(('network.ip4.dhcp_start', self.vni))
self.dhcp4_end = self.zkhandler.read(('network.ip4.dhcp_end', self.vni))
self.vxlan_nic = 'vxlan{}'.format(self.vni)
self.bridge_nic = 'vmbr{}'.format(self.vni)
self.nftables_netconf_filename = '{}/networks/{}.nft'.format(self.config['nft_dynamic_directory'], self.vni)
self.firewall_rules = []
self.dhcp_server_daemon = None
self.dnsmasq_hostsdir = '{}/{}'.format(self.config['dnsmasq_dynamic_directory'], self.vni)
self.dhcp_reservations = []
# Create the network hostsdir
common.run_os_command(
'/bin/mkdir --parents {}'.format(
self.dnsmasq_hostsdir
)
)
self.firewall_rules_base = """# Rules for network {vxlannic}
add chain inet filter {vxlannic}-in
add chain inet filter {vxlannic}-out
add rule inet filter {vxlannic}-in counter
add rule inet filter {vxlannic}-out counter
# Allow ICMP traffic into the router from network
add rule inet filter input ip protocol icmp meta iifname {bridgenic} counter accept
add rule inet filter input ip6 nexthdr icmpv6 meta iifname {bridgenic} counter accept
# Allow DNS, DHCP, and NTP traffic into the router from network
add rule inet filter input tcp dport 53 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 53 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 67 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 123 meta iifname {bridgenic} counter accept
add rule inet filter input ip6 nexthdr udp udp dport 547 meta iifname {bridgenic} counter accept
# Allow metadata API into the router from network
add rule inet filter input tcp dport 80 meta iifname {bridgenic} counter accept
# Block traffic into the router from network
add rule inet filter input meta iifname {bridgenic} counter drop
""".format(
vxlannic=self.vxlan_nic,
bridgenic=self.bridge_nic
)
self.firewall_rules_v4 = """# Jump from forward chain to this chain when matching net (IPv4)
add rule inet filter forward ip daddr {netaddr4} counter jump {vxlannic}-in
add rule inet filter forward ip saddr {netaddr4} counter jump {vxlannic}-out
""".format(
netaddr4=self.ip4_network,
vxlannic=self.vxlan_nic,
)
self.firewall_rules_v6 = """# Jump from forward chain to this chain when matching net (IPv4)
add rule inet filter forward ip6 daddr {netaddr6} counter jump {vxlannic}-in
add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
""".format(
netaddr6=self.ip6_network,
vxlannic=self.vxlan_nic,
)
self.firewall_rules_in = self.zkhandler.children(('network.rule.in', self.vni))
self.firewall_rules_out = self.zkhandler.children(('network.rule.out', self.vni))
# Zookeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network', self.vni))
def watch_network_description(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.description != data.decode('ascii'):
self.old_description = self.description
self.description = data.decode('ascii')
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.domain', self.vni))
def watch_network_domain(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.domain != data.decode('ascii'):
domain = data.decode('ascii')
if self.dhcp_server_daemon:
self.dns_aggregator.remove_network(self)
self.domain = domain
if self.dhcp_server_daemon:
self.dns_aggregator.add_network(self)
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.nameservers', self.vni))
def watch_network_name_servers(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.name_servers != data.decode('ascii'):
name_servers = data.decode('ascii').split(',')
if self.dhcp_server_daemon:
self.dns_aggregator.remove_network(self)
self.name_servers = name_servers
if self.dhcp_server_daemon:
self.dns_aggregator.add_network(self)
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip6.network', self.vni))
def watch_network_ip6_network(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip6_network != data.decode('ascii'):
ip6_network = data.decode('ascii')
self.ip6_network = ip6_network
self.ip6_cidrnetmask = ip6_network.split('/')[-1]
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip6.gateway', self.vni))
def watch_network_gateway6(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip6_gateway != data.decode('ascii'):
orig_gateway = self.ip6_gateway
if self.this_node.router_state in ['primary', 'takeover']:
if orig_gateway:
self.removeGateway6Address()
self.ip6_gateway = data.decode('ascii')
if self.this_node.router_state in ['primary', 'takeover']:
self.createGateway6Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip6.dhcp', self.vni))
def watch_network_dhcp6_status(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp6_flag != (data.decode('ascii') == 'True'):
self.dhcp6_flag = (data.decode('ascii') == 'True')
if self.dhcp6_flag and not self.dhcp_server_daemon and self.this_node.router_state in ['primary', 'takeover']:
self.startDHCPServer()
elif self.dhcp_server_daemon and not self.dhcp4_flag and self.this_node.router_state in ['primary', 'takeover']:
self.stopDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip4.network', self.vni))
def watch_network_ip4_network(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip4_network != data.decode('ascii'):
ip4_network = data.decode('ascii')
self.ip4_network = ip4_network
self.ip4_cidrnetmask = ip4_network.split('/')[-1]
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip4.gateway', self.vni))
def watch_network_gateway4(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip4_gateway != data.decode('ascii'):
orig_gateway = self.ip4_gateway
if self.this_node.router_state in ['primary', 'takeover']:
if orig_gateway:
self.removeGateway4Address()
self.ip4_gateway = data.decode('ascii')
if self.this_node.router_state in ['primary', 'takeover']:
self.createGateway4Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip4.dhcp', self.vni))
def watch_network_dhcp4_status(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp4_flag != (data.decode('ascii') == 'True'):
self.dhcp4_flag = (data.decode('ascii') == 'True')
if self.dhcp4_flag and not self.dhcp_server_daemon and self.this_node.router_state in ['primary', 'takeover']:
self.startDHCPServer()
elif self.dhcp_server_daemon and not self.dhcp6_flag and self.this_node.router_state in ['primary', 'takeover']:
self.stopDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip4.dhcp_start', self.vni))
def watch_network_dhcp4_start(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp4_start != data.decode('ascii'):
self.dhcp4_start = data.decode('ascii')
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('network.ip4.dhcp_end', self.vni))
def watch_network_dhcp4_end(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp4_end != data.decode('ascii'):
self.dhcp4_end = data.decode('ascii')
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.ChildrenWatch(self.zkhandler.schema.path('network.reservation', self.vni))
def watch_network_dhcp_reservations(new_reservations, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if self.dhcp_reservations != new_reservations:
old_reservations = self.dhcp_reservations
self.dhcp_reservations = new_reservations
if self.this_node.router_state in ['primary', 'takeover']:
self.updateDHCPReservations(old_reservations, new_reservations)
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zkhandler.zk_conn.ChildrenWatch(self.zkhandler.schema.path('network.rule.in', self.vni))
def watch_network_firewall_rules_in(new_rules, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Don't run on the first pass
if self.firewall_rules_in != new_rules:
self.firewall_rules_in = new_rules
self.updateFirewallRules()
@self.zkhandler.zk_conn.ChildrenWatch(self.zkhandler.schema.path('network.rule.out', self.vni))
def watch_network_firewall_rules_out(new_rules, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Don't run on the first pass
if self.firewall_rules_out != new_rules:
self.firewall_rules_out = new_rules
self.updateFirewallRules()
self.createNetworkManaged()
self.createFirewall()
def getvni(self):
return self.vni
def updateDHCPReservations(self, old_reservations_list, new_reservations_list):
for reservation in new_reservations_list:
if reservation not in old_reservations_list:
# Add new reservation file
filename = '{}/{}'.format(self.dnsmasq_hostsdir, reservation)
ipaddr = self.zkhandler.read(('network.reservation', self.vni, 'reservation.ip', reservation))
entry = '{},{}'.format(reservation, ipaddr)
# Write the entry
with open(filename, 'w') as outfile:
outfile.write(entry)
for reservation in old_reservations_list:
if reservation not in new_reservations_list:
# Remove old reservation file
filename = '{}/{}'.format(self.dnsmasq_hostsdir, reservation)
try:
os.remove(filename)
self.dhcp_server_daemon.signal('hup')
except Exception:
pass
def updateFirewallRules(self):
if not self.ip4_network:
return
self.logger.out(
'Updating firewall rules',
prefix='VNI {}'.format(self.vni),
state='o'
)
ordered_acls_in = {}
ordered_acls_out = {}
sorted_acl_list = {'in': [], 'out': []}
full_ordered_rules = []
for acl in self.firewall_rules_in:
order = self.zkhandler.read(('network.rule.in', self.vni, 'rule.order', acl))
ordered_acls_in[order] = acl
for acl in self.firewall_rules_out:
order = self.zkhandler.read(('network.rule.out', self.vni, 'rule.order', acl))
ordered_acls_out[order] = acl
for order in sorted(ordered_acls_in.keys()):
sorted_acl_list['in'].append(ordered_acls_in[order])
for order in sorted(ordered_acls_out.keys()):
sorted_acl_list['out'].append(ordered_acls_out[order])
for direction in 'in', 'out':
for acl in sorted_acl_list[direction]:
rule_prefix = "add rule inet filter vxlan{}-{} counter".format(self.vni, direction)
rule_data = self.zkhandler.read((f'network.rule.{direction}', self.vni, 'rule.rule', acl))
rule = '{} {}'.format(rule_prefix, rule_data)
full_ordered_rules.append(rule)
firewall_rules = self.firewall_rules_base
if self.ip6_gateway != 'None':
firewall_rules += self.firewall_rules_v6
if self.ip4_gateway != 'None':
firewall_rules += self.firewall_rules_v4
output = "{}\n# User rules\n{}\n".format(
firewall_rules,
'\n'.join(full_ordered_rules))
with open(self.nftables_netconf_filename, 'w') as nfnetfile:
nfnetfile.write(dedent(output))
# Reload firewall rules
nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory'])
common.reload_firewall_rules(nftables_base_filename, logger=self.logger)
# Create bridged network configuration
def createNetworkBridged(self):
self.logger.out(
'Creating bridged vLAN device {} on interface {}'.format(
self.vlan_nic,
self.bridge_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Create vLAN interface
common.run_os_command(
'ip link add link {} name {} type vlan id {}'.format(
self.bridge_dev,
self.vlan_nic,
self.vni
)
)
# Create bridge interface
common.run_os_command(
'brctl addbr {}'.format(
self.bridge_nic
)
)
# Set MTU of vLAN and bridge NICs
vx_mtu = self.cluster_mtu
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.vlan_nic,
vx_mtu
)
)
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.bridge_nic,
vx_mtu
)
)
# Disable tx checksum offload on bridge interface (breaks DHCP on Debian < 9)
common.run_os_command(
'ethtool -K {} tx off'.format(
self.bridge_nic
)
)
# Disable IPv6 on bridge interface (prevents leakage)
common.run_os_command(
'sysctl net.ipv6.conf.{}.disable_ipv6=1'.format(
self.bridge_nic
)
)
# Add vLAN interface to bridge interface
common.run_os_command(
'brctl addif {} {}'.format(
self.bridge_nic,
self.vlan_nic
)
)
# Create managed network configuration
def createNetworkManaged(self):
self.logger.out(
'Creating VXLAN device on interface {}'.format(
self.cluster_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Create VXLAN interface
common.run_os_command(
'ip link add {} type vxlan id {} dstport 4789 dev {}'.format(
self.vxlan_nic,
self.vni,
self.cluster_dev
)
)
# Create bridge interface
common.run_os_command(
'brctl addbr {}'.format(
self.bridge_nic
)
)
# Set MTU of VXLAN and bridge NICs
vx_mtu = self.cluster_mtu - 50
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.vxlan_nic,
vx_mtu
)
)
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.bridge_nic,
vx_mtu
)
)
# Disable tx checksum offload on bridge interface (breaks DHCP on Debian < 9)
common.run_os_command(
'ethtool -K {} tx off'.format(
self.bridge_nic
)
)
# Disable IPv6 DAD on bridge interface
common.run_os_command(
'sysctl net.ipv6.conf.{}.accept_dad=0'.format(
self.bridge_nic
)
)
# Add VXLAN interface to bridge interface
common.run_os_command(
'brctl addif {} {}'.format(
self.bridge_nic,
self.vxlan_nic
)
)
def createFirewall(self):
if self.nettype == 'managed':
# For future use
self.updateFirewallRules()
def createGateways(self):
if self.nettype == 'managed':
if self.ip6_gateway != 'None':
self.createGateway6Address()
if self.ip4_gateway != 'None':
self.createGateway4Address()
def createGateway6Address(self):
if self.this_node.router_state in ['primary', 'takeover']:
self.logger.out(
'Creating gateway {}/{} on interface {}'.format(
self.ip6_gateway,
self.ip6_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.createIPAddress(self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic)
def createGateway4Address(self):
if self.this_node.router_state in ['primary', 'takeover']:
self.logger.out(
'Creating gateway {}/{} on interface {}'.format(
self.ip4_gateway,
self.ip4_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.createIPAddress(self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic)
def startDHCPServer(self):
if self.this_node.router_state in ['primary', 'takeover'] and self.nettype == 'managed':
self.logger.out(
'Starting dnsmasq DHCP server on interface {}'.format(
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Recreate the environment we need for dnsmasq
pvcnoded_config_file = os.environ['PVCD_CONFIG_FILE']
dhcp_environment = {
'DNSMASQ_BRIDGE_INTERFACE': self.bridge_nic,
'PVCD_CONFIG_FILE': pvcnoded_config_file
}
# Define the dnsmasq config fragments
dhcp_configuration_base = [
'--domain-needed',
'--bogus-priv',
'--no-hosts',
'--dhcp-authoritative',
'--filterwin2k',
'--expand-hosts',
'--domain-needed',
'--domain={}'.format(self.domain),
'--local=/{}/'.format(self.domain),
'--log-facility=-',
'--log-dhcp',
'--keep-in-foreground',
'--leasefile-ro',
'--dhcp-script={}/pvcnoded/dnsmasq-zookeeper-leases.py'.format(os.getcwd()),
'--dhcp-hostsdir={}'.format(self.dnsmasq_hostsdir),
'--bind-interfaces',
]
dhcp_configuration_v4 = [
'--listen-address={}'.format(self.ip4_gateway),
'--auth-zone={}'.format(self.domain),
'--auth-peer={}'.format(self.ip4_gateway),
'--auth-server={}'.format(self.ip4_gateway),
'--auth-sec-servers={}'.format(self.ip4_gateway),
]
dhcp_configuration_v4_dhcp = [
'--dhcp-option=option:ntp-server,{}'.format(self.ip4_gateway),
'--dhcp-range={},{},48h'.format(self.dhcp4_start, self.dhcp4_end),
]
dhcp_configuration_v6 = [
'--listen-address={}'.format(self.ip6_gateway),
'--auth-zone={}'.format(self.domain),
'--auth-peer={}'.format(self.ip6_gateway),
'--auth-server={}'.format(self.ip6_gateway),
'--auth-sec-servers={}'.format(self.ip6_gateway),
'--dhcp-option=option6:dns-server,[{}]'.format(self.ip6_gateway),
'--dhcp-option=option6:sntp-server,[{}]'.format(self.ip6_gateway),
'--enable-ra',
]
dhcp_configuration_v6_dualstack = [
'--dhcp-range=net:{nic},::,constructor:{nic},ra-stateless,ra-names'.format(nic=self.bridge_nic),
]
dhcp_configuration_v6_only = [
'--auth-server={}'.format(self.ip6_gateway),
'--dhcp-range=net:{nic},::2,::ffff:ffff:ffff:ffff,constructor:{nic},64,24h'.format(nic=self.bridge_nic),
]
# Assemble the DHCP configuration
dhcp_configuration = dhcp_configuration_base
if self.dhcp6_flag:
dhcp_configuration += dhcp_configuration_v6
if self.dhcp4_flag:
dhcp_configuration += dhcp_configuration_v6_dualstack
else:
dhcp_configuration += dhcp_configuration_v6_only
else:
dhcp_configuration += dhcp_configuration_v4
if self.dhcp4_flag:
dhcp_configuration += dhcp_configuration_v4_dhcp
# Start the dnsmasq process in a thread
print('/usr/sbin/dnsmasq {}'.format(' '.join(dhcp_configuration)))
self.dhcp_server_daemon = common.run_os_daemon(
'/usr/sbin/dnsmasq {}'.format(
' '.join(dhcp_configuration)
),
environment=dhcp_environment,
logfile='{}/dnsmasq-{}.log'.format(self.config['dnsmasq_log_directory'], self.vni)
)
# Remove network
def removeNetwork(self):
if self.nettype == 'bridged':
self.removeNetworkBridged()
elif self.nettype == 'managed':
self.removeNetworkManaged()
# Remove bridged network configuration
def removeNetworkBridged(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.cluster_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip link set {} down'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link set {} down'.format(
self.vlan_nic
)
)
common.run_os_command(
'brctl delif {} {}'.format(
self.bridge_nic,
self.vlan_nic
)
)
common.run_os_command(
'brctl delbr {}'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link delete {}'.format(
self.vlan_nic
)
)
# Remove managed network configuration
def removeNetworkManaged(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.cluster_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip link set {} down'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link set {} down'.format(
self.vxlan_nic
)
)
common.run_os_command(
'brctl delif {} {}'.format(
self.bridge_nic,
self.vxlan_nic
)
)
common.run_os_command(
'brctl delbr {}'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link delete {}'.format(
self.vxlan_nic
)
)
def removeFirewall(self):
self.logger.out(
'Removing firewall rules',
prefix='VNI {}'.format(self.vni),
state='o'
)
try:
os.remove(self.nftables_netconf_filename)
except Exception:
pass
# Reload firewall rules
nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory'])
common.reload_firewall_rules(nftables_base_filename, logger=self.logger)
def removeGateways(self):
if self.nettype == 'managed':
if self.ip6_gateway != 'None':
self.removeGateway6Address()
if self.ip4_gateway != 'None':
self.removeGateway4Address()
def removeGateway6Address(self):
self.logger.out(
'Removing gateway {}/{} from interface {}'.format(
self.ip6_gateway,
self.ip6_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.removeIPAddress(self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic)
def removeGateway4Address(self):
self.logger.out(
'Removing gateway {}/{} from interface {}'.format(
self.ip4_gateway,
self.ip4_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.removeIPAddress(self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic)
def stopDHCPServer(self):
if self.nettype == 'managed' and self.dhcp_server_daemon:
self.logger.out(
'Stopping dnsmasq DHCP server on interface {}'.format(
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Terminate, then kill
self.dhcp_server_daemon.signal('term')
time.sleep(0.2)
self.dhcp_server_daemon.signal('kill')
self.dhcp_server_daemon = None

View File