Use consistent naming of components

Rename "pvcd" to "pvcnoded", and "pvc-api" to "pvcapid" so names for the
daemons are fully consistent. Update the names of the configuration
files as well to match this new formatting.

References #79
This commit is contained in:
2020-02-08 19:16:19 -05:00
parent 83704d8677
commit ce985234c3
43 changed files with 158 additions and 159 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,531 @@
#!/usr/bin/env python3
# DNSAggregatorInstance.py - Class implementing a DNS aggregator and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import time
import threading
import dns.zone
import dns.query
import psycopg2
import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
class DNSAggregatorInstance(object):
# Initialization function
def __init__(self, zk_conn, config, logger):
self.zk_conn = zk_conn
self.config = config
self.logger = logger
self.dns_networks = dict()
self.is_active = False
self.dns_server_daemon = PowerDNSInstance(self)
self.dns_axfr_daemon = AXFRDaemonInstance(self)
# Start up the PowerDNS instance
def start_aggregator(self):
# Restart the SQL connection
self.dns_server_daemon.start()
self.dns_axfr_daemon.start()
self.is_active = True
# Stop the PowerDNS instance
def stop_aggregator(self):
self.is_active = False
self.dns_axfr_daemon.stop()
self.dns_server_daemon.stop()
def add_network(self, network):
self.dns_networks[network] = DNSNetworkInstance(self, network)
self.dns_networks[network].add_network()
self.dns_axfr_daemon.update_networks(self.dns_networks)
def remove_network(self, network):
if self.dns_networks[network]:
self.dns_networks[network].remove_network()
del self.dns_networks[network]
self.dns_axfr_daemon.update_networks(self.dns_networks)
class PowerDNSInstance(object):
# Initialization function
def __init__(self, aggregator):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.dns_server_daemon = None
# Floating upstreams
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/')
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
def start(self):
self.logger.out(
'Starting PowerDNS zone aggregator',
state='i'
)
# Define the PowerDNS config
dns_configuration = [
# Option # Explanation
'--no-config',
'--daemon=no', # Start directly
'--guardian=yes', # Use a guardian
'--disable-syslog=yes', # Log only to stdout (which is then captured)
'--disable-axfr=no', # Allow AXFRs
'--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere
'--local-address={},{}'.format(self.vni_ipaddr, self.upstream_ipaddr),
# Listen on floating IPs
'--local-port=53', # On port 53
'--log-dns-details=on', # Log details
'--loglevel=3', # Log info
'--master=yes', # Enable master mode
'--slave=yes', # Enable slave mode
'--slave-renotify=yes', # Renotify out for our slaved zones
'--version-string=powerdns', # Set the version string
'--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name
'--socket-dir={}'.format(self.config['pdns_dynamic_directory']),
# Standard socket directory
'--launch=gpgsql', # Use the PostgreSQL backend
'--gpgsql-host={}'.format(self.config['pdns_postgresql_host']),
# PostgreSQL instance
'--gpgsql-port={}'.format(self.config['pdns_postgresql_port']),
# Default port
'--gpgsql-dbname={}'.format(self.config['pdns_postgresql_dbname']),
# Database name
'--gpgsql-user={}'.format(self.config['pdns_postgresql_user']),
# User name
'--gpgsql-password={}'.format(self.config['pdns_postgresql_password']),
# User password
'--gpgsql-dnssec=no', # Do DNSSEC elsewhere
]
# Start the pdns process in a thread
self.dns_server_daemon = common.run_os_daemon(
'/usr/sbin/pdns_server {}'.format(
' '.join(dns_configuration)
),
environment=None,
logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory'])
)
if self.dns_server_daemon:
self.logger.out(
'Successfully started PowerDNS zone aggregator',
state='o'
)
def stop(self):
if self.dns_server_daemon:
self.logger.out(
'Stopping PowerDNS zone aggregator',
state='i'
)
# Terminate, then kill
self.dns_server_daemon.signal('term')
time.sleep(0.2)
self.dns_server_daemon.signal('kill')
self.logger.out(
'Successfully stopped PowerDNS zone aggregator',
state='o'
)
class DNSNetworkInstance(object):
# Initialization function
def __init__(self, aggregator, network):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.sql_conn = None
self.network = network
# Add a new network to the aggregator database
def add_network(self):
network_domain = self.network.domain
if self.network.ip4_gateway != 'None':
network_gateway = self.network.ip4_gateway
else:
network_gateway = self.network.ip6_gateway
self.logger.out(
'Adding entry for client domain {}'.format(
network_domain
),
prefix='DNS aggregator',
state='o'
)
# Connect to the database
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
sql_curs = self.sql_conn.cursor()
# Try to access the domains entry
sql_curs.execute(
"SELECT * FROM domains WHERE name=%s",
(network_domain,)
)
results = sql_curs.fetchone()
# If we got back a result, don't try to add the domain to the DB
if results:
write_domain = False
else:
write_domain = True
# Write the domain to the database if we're active
if self.aggregator.is_active and write_domain:
sql_curs.execute(
"INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, 'MASTER', 'internal', 0)",
(network_domain,)
)
self.sql_conn.commit()
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(network_domain,)
)
domain_id = sql_curs.fetchone()
sql_curs.execute(
"""
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
(%s, %s, %s, %s, %s, %s)
""",
(domain_id, network_domain, 'nsX.{d} root.{d} 1 10800 1800 86400 86400'.format(d=self.config['upstream_domain']), 'SOA', 86400, 0)
)
if self.network.name_servers:
ns_servers = self.network.name_servers
else:
ns_servers = ['pvc-dns.{}'.format(self.config['upstream_domain'])]
for ns_server in ns_servers:
sql_curs.execute(
"""
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
(%s, %s, %s, %s, %s, %s)
""",
(domain_id, network_domain, ns_server, 'NS', 86400, 0)
)
self.sql_conn.commit()
self.sql_conn.close()
self.sql_conn = None
# Remove a deleted network from the aggregator database
def remove_network(self):
network_domain = self.network.domain
self.logger.out(
'Removing entry for client domain {}'.format(
network_domain
),
prefix='DNS aggregator',
state='o'
)
# Connect to the database
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
sql_curs = self.sql_conn.cursor()
# Get the domain ID
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(network_domain,)
)
domain_id = sql_curs.fetchone()
# Delete the domain from the database if we're active
if self.aggregator.is_active and domain_id:
sql_curs.execute(
"DELETE FROM domains WHERE id=%s",
(domain_id,)
)
sql_curs.execute(
"DELETE FROM records WHERE domain_id=%s",
(domain_id,)
)
self.sql_conn.commit()
self.sql_conn.close()
self.sql_conn = None
class AXFRDaemonInstance(object):
# Initialization function
def __init__(self, aggregator):
self.aggregator = aggregator
self.config = self.aggregator.config
self.logger = self.aggregator.logger
self.dns_networks = self.aggregator.dns_networks
self.thread_stopper = threading.Event()
self.thread = None
self.sql_conn = None
def update_networks(self, dns_networks):
self.dns_networks = dns_networks
def start(self):
# Create the thread
self.thread_stopper.clear()
self.thread = threading.Thread(target=self.run, args=(), kwargs={})
# Start a local instance of the SQL connection
# Trying to use the instance from the main DNS Aggregator can result in connection failures
# after the leader transitions
self.sql_conn = psycopg2.connect(
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
self.config['pdns_postgresql_host'],
self.config['pdns_postgresql_port'],
self.config['pdns_postgresql_dbname'],
self.config['pdns_postgresql_user'],
self.config['pdns_postgresql_password']
)
)
# Start the thread
self.thread.start()
def stop(self):
self.thread_stopper.set()
if self.sql_conn:
self.sql_conn.close()
self.sql_conn = None
def run(self):
# Wait for all the DNSMASQ instances to actually start
time.sleep(5)
while not self.thread_stopper.is_set():
# We do this for each network
for network, instance in self.dns_networks.items():
zone_modified = False
# Set up our SQL cursor
sql_curs = self.sql_conn.cursor()
# Set up our basic variables
domain = network.domain
if network.ip4_gateway != 'None':
dnsmasq_ip = network.ip4_gateway
else:
dnsmasq_ip = network.ip6_gateway
#
# Get an AXFR from the dnsmasq instance and list of records
#
try:
axfr = dns.query.xfr(dnsmasq_ip, domain, lifetime=5.0)
z = dns.zone.from_xfr(axfr)
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
except Exception as e:
if self.config['debug']:
print('{} {} ({})'.format(e, dnsmasq_ip, domain))
continue
# Fix the formatting because it's useless
# reference: ['@ 600 IN SOA . . 4 1200 180 1209600 600\n@ 600 IN NS .', 'test3 600 IN A 10.1.1.203\ntest3 600 IN AAAA 2001:b23e:1113:0:5054:ff:fe5c:f131', etc.]
# We don't really care about dnsmasq's terrible SOA or NS records which are in [0]
string_records = '\n'.join(records_raw[1:])
# Split into individual records
records_new = list()
for element in string_records.split('\n'):
if element:
record = element.split()
# Handle space-containing data elements
if domain not in record[0]:
name = '{}.{}'.format(record[0], domain)
else:
name = record[0]
entry = '{} {} IN {} {}'.format(name, record[1], record[3], ' '.join(record[4:]))
records_new.append(entry)
#
# Get the current zone from the database
#
try:
sql_curs.execute(
"SELECT id FROM domains WHERE name=%s",
(domain,)
)
domain_id = sql_curs.fetchone()
sql_curs.execute(
"SELECT * FROM records WHERE domain_id=%s",
(domain_id,)
)
results = list(sql_curs.fetchall())
if self.config['debug']:
print('SQL query results: {}'.format(results))
except Exception as e:
self.logger.out('ERROR: Failed to obtain DNS records from database: {}'.format(e))
# Fix the formatting because it's useless for comparison
# reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.)
records_old = list()
records_old_ids = list()
if not results:
if self.config['debug']:
print('No results found, skipping.')
continue
for record in results:
# Skip the non-A
r_id = record[0]
r_name = record[2]
r_ttl = record[5]
r_type = record[3]
r_data = record[4]
# Assemble a list element in the same format as the AXFR data
entry = '{} {} IN {} {}'.format(r_name, r_ttl, r_type, r_data)
if self.config['debug']:
print('Found record: {}'.format(entry))
# Skip non-A or AAAA records
if r_type != 'A' and r_type != 'AAAA':
if self.config['debug']:
print('Skipping record {}, not A or AAAA: "{}"'.format(entry, r_type))
continue
records_old.append(entry)
records_old_ids.append(r_id)
records_new.sort()
records_old.sort()
if self.config['debug']:
print('New: {}'.format(records_new))
print('Old: {}'.format(records_old))
# Find the differences between the lists
# Basic check one: are they completely equal
if records_new != records_old:
# Get set elements
in_new = set(records_new)
in_old = set(records_old)
in_new_not_in_old = in_new - in_old
in_old_not_in_new = in_old - in_new
if self.config['debug']:
print('New but not old: {}'.format(in_new_not_in_old))
print('Old but not new: {}'.format(in_old_not_in_new))
# Go through the old list
remove_records = list() # list of database IDs
for i in range(len(records_old)):
record_id = records_old_ids[i]
record = records_old[i]
splitrecord = records_old[i].split()
# If the record is not in the new list, remove it
if record in in_old_not_in_new:
remove_records.append(record_id)
# Go through the new elements
for newrecord in in_new_not_in_old:
splitnewrecord = newrecord.split()
# If there's a name and type match with different content, remove the old one
if splitrecord[0] == splitnewrecord[0] and splitrecord[3] == splitnewrecord[3]:
remove_records.append(record_id)
changed = False
if len(remove_records) > 0:
# Remove the invalid old records
for record_id in remove_records:
if self.config['debug']:
print('Removing record: {}'.format(record_id))
sql_curs.execute(
"DELETE FROM records WHERE id=%s",
(record_id,)
)
changed = True
if len(in_new_not_in_old) > 0:
# Add the new records
for record in in_new_not_in_old:
# [NAME, TTL, 'IN', TYPE, DATA]
record = record.split()
r_name = record[0]
r_ttl = record[1]
r_type = record[3]
r_data = record[4]
if self.config['debug']:
print('Add record: {}'.format(name))
try:
sql_curs.execute(
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
(domain_id, r_name, r_ttl, r_type, 0, r_data)
)
changed = True
except psycopg2.IntegrityError as e:
if self.config['debug']:
print('Failed to add record due to {}: {}'.format(e, name))
if changed:
# Increase SOA serial
sql_curs.execute(
"SELECT content FROM records WHERE domain_id=%s AND type='SOA'",
(domain_id,)
)
soa_record = list(sql_curs.fetchone())[0].split()
current_serial = int(soa_record[2])
new_serial = current_serial + 1
soa_record[2] = str(new_serial)
if self.config['debug']:
print('Records changed; bumping SOA: {}'.format(new_serial))
sql_curs.execute(
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
(' '.join(soa_record), domain_id)
)
# Commit all the previous changes
if self.config['debug']:
print('Committing database changes and reloading PDNS')
try:
self.sql_conn.commit()
except Exception as e:
self.logger.out('ERROR: Failed to commit DNS aggregator changes: {}'.format(e), state='e')
# Reload the domain
common.run_os_command(
'/usr/bin/pdns_control --socket-dir={} reload {}'.format(
self.config['pdns_dynamic_directory'],
domain
),
background=False
)
# Wait for 10 seconds
time.sleep(10)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,195 @@
#!/usr/bin/env python3
# MetadataAPIInstance.py - Class implementing an EC2-compatible cloud-init Metadata server
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import gevent.pywsgi
import flask
import threading
import sys
import time
import psycopg2
from psycopg2.extras import RealDictCursor
# The metadata server requires client libraries
import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network
class MetadataAPIInstance(object):
mdapi = flask.Flask(__name__)
# Initialization function
def __init__(self, zk_conn, config, logger):
self.zk_conn = zk_conn
self.config = config
self.logger = logger
self.thread = None
self.md_http_server = None
self.add_routes()
# Add flask routes inside our instance
def add_routes(self):
@self.mdapi.route('/', methods=['GET'])
def api_root():
return flask.jsonify({"message": "PVC Provisioner Metadata API version 1"}), 209
@self.mdapi.route('/<version>/meta-data/', methods=['GET'])
def api_metadata_root(version):
metadata = """instance-id\nname\nprofile"""
return metadata, 200
@self.mdapi.route('/<version>/meta-data/instance-id', methods=['GET'])
def api_metadata_instanceid(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
instance_id = vm_details.get('uuid', None)
return instance_id, 200
@self.mdapi.route('/<version>/meta-data/name', methods=['GET'])
def api_metadata_hostname(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
vm_name = vm_details.get('name', None)
return vm_name, 200
@self.mdapi.route('/<version>/meta-data/profile', methods=['GET'])
def api_metadata_profile(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
vm_profile = vm_details.get('profile', None)
return vm_profile, 200
@self.mdapi.route('/<version>/user-data', methods=['GET'])
def api_userdata(version):
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
vm_details = self.get_vm_details(source_address)
vm_profile = vm_details.get('profile', None)
# Get the userdata
if vm_profile:
userdata = self.get_profile_userdata(vm_profile)
self.logger.out("Returning userdata for profile {}".format(vm_profile), state='i', prefix='Metadata API')
else:
userdata = None
return flask.Response(userdata)
def launch_wsgi(self):
try:
self.md_http_server = gevent.pywsgi.WSGIServer(
('169.254.169.254', 80),
self.mdapi,
log=sys.stdout,
error_log=sys.stdout
)
self.md_http_server.serve_forever()
except Exception as e:
self.logger.out('Error starting Metadata API: {}'.format(e), state='e')
# WSGI start/stop
def start(self):
# Launch Metadata API
self.logger.out('Starting Metadata API at 169.254.169.254:80', state='i')
self.thread = threading.Thread(target=self.launch_wsgi)
self.thread.start()
self.logger.out('Successfully started Metadata API thread', state='o')
def stop(self):
if not self.md_http_server:
return
self.logger.out('Stopping Metadata API at 169.254.169.254:80', state='i')
try:
self.md_http_server.stop()
time.sleep(0.1)
self.md_http_server.close()
time.sleep(0.1)
self.md_http_server = None
self.logger.out('Successfully stopped Metadata API', state='o')
except Exception as e:
self.logger.out('Error stopping Metadata API: {}'.format(e), state='e')
# Helper functions
def open_database(self):
conn = psycopg2.connect(
host=self.config['metadata_postgresql_host'],
port=self.config['metadata_postgresql_port'],
dbname=self.config['metadata_postgresql_dbname'],
user=self.config['metadata_postgresql_user'],
password=self.config['metadata_postgresql_password']
)
cur = conn.cursor(cursor_factory=RealDictCursor)
return conn, cur
def close_database(self, conn, cur):
cur.close()
conn.close()
# Obtain a list of templates
def get_profile_userdata(self, vm_profile):
query = """SELECT userdata.userdata FROM profile
JOIN userdata ON profile.userdata = userdata.id
WHERE profile.name = %s;
"""
args = (vm_profile,)
conn, cur = self.open_database()
cur.execute(query, args)
data_raw = cur.fetchone()
self.close_database(conn, cur)
data = data_raw.get('userdata', None)
return data
# VM details function
def get_vm_details(self, source_address):
# Start connection to Zookeeper
_discard, networks = pvc_network.get_list(self.zk_conn, None)
# Figure out which server this is via the DHCP address
host_information = dict()
networks_managed = (x for x in networks if x.get('type') == 'managed')
for network in networks_managed:
network_leases = pvc_network.getNetworkDHCPLeases(self.zk_conn, network.get('vni'))
for network_lease in network_leases:
information = pvc_network.getDHCPLeaseInformation(self.zk_conn, network.get('vni'), network_lease)
try:
if information.get('ip4_address', None) == source_address:
host_information = information
except:
pass
# Get our real information on the host; now we can start querying about it
client_hostname = host_information.get('hostname', None)
client_macaddr = host_information.get('mac_address', None)
client_ipaddr = host_information.get('ip4_address', None)
# Find the VM with that MAC address - we can't assume that the hostname is actually right
_discard, vm_list = pvc_vm.get_list(self.zk_conn, None, None, None)
vm_name = None
vm_details = dict()
for vm in vm_list:
try:
for network in vm.get('networks'):
if network.get('mac', None) == client_macaddr:
vm_name = vm.get('name')
vm_details = vm
except:
pass
return vm_details

View File

@ -0,0 +1,714 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import psutil
import socket
import time
import libvirt
import threading
import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
class NodeInstance(object):
# Initialization function
def __init__(self, name, this_node, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api):
# Passed-in variables on creation
self.name = name
self.this_node = this_node
self.zk_conn = zk_conn
self.config = config
self.logger = logger
# Which node is primary
self.primary_node = None
# States
self.daemon_mode = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonmode'.format(self.name))
self.daemon_state = 'stop'
self.router_state = 'client'
self.domain_state = 'ready'
# Object lists
self.d_node = d_node
self.d_network = d_network
self.d_domain = d_domain
self.dns_aggregator = dns_aggregator
self.metadata_api = metadata_api
# Printable lists
self.active_node_list = []
self.flushed_node_list = []
self.inactive_node_list = []
self.network_list = []
self.domain_list = []
# Node resources
self.domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Floating IP configurations
if self.config['enable_networking']:
self.vni_dev = self.config['vni_dev']
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/')
self.upstream_dev = self.config['upstream_dev']
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
else:
self.vni_dev = None
self.vni_ipaddr = None
self.vni_cidrnetmask = None
self.upstream_dev = None
self.upstream_ipaddr = None
self.upstream_cidrnetmask = None
# Threads
self.flush_thread = None
# Flags
self.flush_stopper = False
# Zookeeper handlers for changed states
@self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
def watch_node_daemonstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'stop'
if data != self.daemon_state:
self.daemon_state = data
@self.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name))
def watch_node_routerstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'client'
if self.name == self.this_node and self.daemon_mode == 'coordinator':
# We're a coordinator so we care about networking
if data != self.router_state:
self.router_state = data
if self.config['enable_networking']:
if self.router_state == 'primary':
self.logger.out('Setting node {} to primary state'.format(self.name), state='i')
transition_thread = threading.Thread(target=self.become_primary, args=(), kwargs={})
transition_thread.start()
else:
# Skip becoming secondary unless already running
if self.daemon_state == 'run' or self.daemon_state == 'shutdown':
self.logger.out('Setting node {} to secondary state'.format(self.name), state='i')
transition_thread = threading.Thread(target=self.become_secondary, args=(), kwargs={})
transition_thread.start()
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
def watch_node_domainstate(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 'unknown'
if data != self.domain_state:
self.domain_state = data
# toggle state management of this node
if self.name == self.this_node:
# Stop any existing flush jobs
if self.flush_thread is not None:
self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i')
self.flush_stopper = True
while self.flush_stopper:
time.sleep(0.1)
# Do flushing in a thread so it doesn't block the migrates out
if self.domain_state == 'flush':
self.flush_thread = threading.Thread(target=self.flush, args=(), kwargs={})
self.flush_thread.start()
# Do unflushing in a thread so it doesn't block the migrates in
if self.domain_state == 'unflush':
self.flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={})
self.flush_thread.start()
@self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
def watch_node_memfree(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name))
def watch_node_memused(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name))
def watch_node_memalloc(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name))
def watch_node_vcpualloc(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name))
def watch_node_runningdomains(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii').split()
except AttributeError:
data = []
if data != self.domain_list:
self.domain_list = data
@self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
def watch_node_domainscount(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.domains_count:
self.domains_count = data
# Update value functions
def update_node_list(self, d_node):
self.d_node = d_node
def update_network_list(self, d_network):
self.d_network = d_network
network_list = []
for network in self.d_network:
network_list.append(d_network[network].vni)
self.network_list = network_list
def update_domain_list(self, d_domain):
self.d_domain = d_domain
######
# Phases of node transition
#
# Current Primary Candidate Secondary
# -> secondary -> primary
#
# def become_secondary() def become_primary()
#
# A ----------------------------------------------------------------- SYNC (candidate)
# B ----------------------------------------------------------------- SYNC (current)
# 1. Stop DNS aggregator ||
# 2. Stop DHCP servers ||
# 4a) network 1 ||
# 4b) network 2 ||
# etc. ||
# 3. Stop client API ||
# 4. Stop metadata API ||
# --
# C ----------------------------------------------------------------- SYNC (candidate)
# 5. Remove upstream floating IP 1. Add upstream floating IP ||
# --
# D ----------------------------------------------------------------- SYNC (candidate)
# 6. Remove cluster floating IP 2. Add cluster floating IP ||
# --
# E ----------------------------------------------------------------- SYNC (candidate)
# 7. Remove metadata floating IP 3. Add metadata floating IP ||
# --
# F ----------------------------------------------------------------- SYNC (candidate)
# 8. Remove gateway IPs 4. Add gateway IPs ||
# 8a) network 1 4a) network 1 ||
# 8b) network 2 4b) network 2 ||
# etc. etc. ||
# --
# G ----------------------------------------------------------------- SYNC (candidate)
# 5. Transition Patroni primary ||
# 6. Start client API ||
# 7. Start metadata API ||
# 8. Start DHCP servers ||
# 5a) network 1 ||
# 5b) network 2 ||
# etc. ||
# 9. Start DNS aggregator ||
# --
######
def become_primary(self):
"""
Acquire primary coordinator status from a peer node
"""
# Lock the primary node until transition is complete
primary_lock = zkhandler.writelock(self.zk_conn, '/primary_node')
primary_lock.acquire()
# Ensure our lock key is populated
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
# Synchronize nodes A (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization A', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization A', state='o')
time.sleep(1) # Time for reader to acquire the lock
self.logger.out('Releasing write lock for synchronization A', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization A', state='o')
time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes B (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization B', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization B', state='o')
self.logger.out('Releasing read lock for synchronization B', state='i')
lock.release()
self.logger.out('Released read lock for synchronization B', state='o')
# Synchronize nodes C (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization C', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization C', state='o')
time.sleep(0.5) # Time for reader to acquire the lock
# 1. Add Upstream floating IP
self.logger.out(
'Creating floating upstream IP {}/{} on interface {}'.format(
self.upstream_ipaddr,
self.upstream_cidrnetmask,
'brupstream'
),
state='o'
)
common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing write lock for synchronization C', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization C', state='o')
# Synchronize nodes D (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization D', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization D', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 2. Add Cluster floating IP
self.logger.out(
'Creating floating management IP {}/{} on interface {}'.format(
self.vni_ipaddr,
self.vni_cidrnetmask,
'brcluster'
),
state='o'
)
common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
self.logger.out('Releasing write lock for synchronization D', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization D', state='o')
# Synchronize nodes E (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization E', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization E', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 3. Add Metadata link-local IP
self.logger.out(
'Creating Metadata link-local IP {}/{} on interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.createIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing write lock for synchronization E', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization E', state='o')
# Synchronize nodes F (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization F', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization F', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 4. Add gateway IPs
for network in self.d_network:
self.d_network[network].createGateways()
self.logger.out('Releasing write lock for synchronization F', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization F', state='o')
# Synchronize nodes G (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization G', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization G', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 5. Transition Patroni primary
self.logger.out('Setting Patroni leader to this node', state='i')
tick = 1
patroni_failed = True
# As long as we're primary, keep trying to set the Patroni leader to us
while self.router_state == 'primary':
# Switch Patroni leader to the local instance
retcode, stdout, stderr = common.run_os_command(
"""
patronictl
-c /etc/patroni/config.yml
-d zookeeper://localhost:2181
switchover
--candidate {}
--force
pvc
""".format(self.name)
)
# Combine the stdout and stderr and strip the output
# Patronictl's output is pretty junky
if stderr:
stdout += stderr
stdout = stdout.strip()
# Handle our current Patroni leader being us
if stdout and stdout.split('\n')[-1].split() == ["Error:", "Switchover", "target", "and", "source", "are", "the", "same."]:
self.logger.out('Failed to switch Patroni leader to ourselves; this is fine\n{}'.format(stdout), state='w')
break
# Handle a failed switchover
elif stdout and (stdout.split('\n')[-1].split()[:2] == ["Switchover", "failed,"] or stdout.strip().split('\n')[-1].split()[:1] == ["Error"]):
if tick > 4:
self.logger.out('Failed to switch Patroni leader after 5 tries; aborting', state='e')
break
else:
self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}\n'.format(tick, stdout), state='e')
tick += 1
time.sleep(5)
# Otherwise, we succeeded
else:
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
patroni_failed = False
time.sleep(0.2)
break
# 6. Start client API (and provisioner worker)
if self.config['enable_api']:
self.logger.out('Starting PVC API client service', state='i')
common.run_os_command("systemctl start pvcapid.service")
self.logger.out('Starting PVC Provisioner Worker service', state='i')
common.run_os_command("systemctl start pvcapid-worker.service")
# 7. Start metadata API; just continue if we fail
self.metadata_api.start()
# 8. Start DHCP servers
for network in self.d_network:
self.d_network[network].startDHCPServer()
# 9. Start DNS aggregator; just continue if we fail
if not patroni_failed:
self.dns_aggregator.start_aggregator()
else:
self.logger.out('Not starting DNS aggregator due to Patroni failures', state='e')
self.logger.out('Releasing write lock for synchronization G', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization G', state='o')
primary_lock.release()
self.logger.out('Node {} transitioned to primary state'.format(self.name), state='o')
def become_secondary(self):
"""
Relinquish primary coordinator status to a peer node
"""
time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Synchronize nodes A (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization A', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization A', state='o')
self.logger.out('Releasing read lock for synchronization A', state='i')
lock.release()
self.logger.out('Released read lock for synchronization A', state='o')
# Synchronize nodes B (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring write lock for synchronization B', state='i')
lock.acquire()
self.logger.out('Acquired write lock for synchronization B', state='o')
time.sleep(0.2) # Time for reader to acquire the lock
# 1. Stop DNS aggregator
self.dns_aggregator.stop_aggregator()
# 2. Stop DHCP servers
for network in self.d_network:
self.d_network[network].stopDHCPServer()
self.logger.out('Releasing write lock for synchronization B', state='i')
zkhandler.writedata(self.zk_conn, {'/locks/primary_node': ''})
lock.release()
self.logger.out('Released write lock for synchronization B', state='o')
# 3. Stop client API
if self.config['enable_api']:
self.logger.out('Stopping PVC API client service', state='i')
common.run_os_command("systemctl stop pvcapid.service")
# 4. Stop metadata API
self.metadata_api.stop()
time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes C (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization C', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization C', state='o')
# 5. Remove Upstream floating IP
self.logger.out(
'Removing floating upstream IP {}/{} from interface {}'.format(
self.upstream_ipaddr,
self.upstream_cidrnetmask,
'brupstream'
),
state='o'
)
common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, 'brupstream')
self.logger.out('Releasing read lock for synchronization C', state='i')
lock.release()
self.logger.out('Released read lock for synchronization C', state='o')
# Synchronize nodes D (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization D', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization D', state='o')
# 6. Remove Cluster floating IP
self.logger.out(
'Removing floating management IP {}/{} from interface {}'.format(
self.vni_ipaddr,
self.vni_cidrnetmask,
'brcluster'
),
state='o'
)
common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
self.logger.out('Releasing read lock for synchronization D', state='i')
lock.release()
self.logger.out('Released read lock for synchronization D', state='o')
# Synchronize nodes E (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization E', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization E', state='o')
# 7. Remove Metadata link-local IP
self.logger.out(
'Removing Metadata link-local IP {}/{} from interface {}'.format(
'169.254.169.254',
'32',
'lo'
),
state='o'
)
common.removeIPAddress('169.254.169.254', '32', 'lo')
self.logger.out('Releasing read lock for synchronization E', state='i')
lock.release()
self.logger.out('Released read lock for synchronization E', state='o')
# Synchronize nodes F (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization F', state='i')
lock.acquire()
self.logger.out('Acquired read lock for synchronization F', state='o')
# 8. Remove gateway IPs
for network in self.d_network:
self.d_network[network].removeGateways()
self.logger.out('Releasing read lock for synchronization F', state='i')
lock.release()
self.logger.out('Released read lock for synchronization F', state='o')
# Synchronize nodes G (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/primary_node')
self.logger.out('Acquiring read lock for synchronization G', state='i')
try:
lock.acquire(timeout=60) # Don't wait forever and completely block us
self.logger.out('Acquired read lock for synchronization G', state='o')
except:
pass
self.logger.out('Releasing read lock for synchronization G', state='i')
lock.release()
self.logger.out('Released read lock for synchronization G', state='o')
self.logger.out('Node {} transitioned to secondary state'.format(self.name), state='o')
# Flush all VMs on the host
def flush(self):
# Begin flush
self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i')
self.logger.out('VM list: {}'.format(', '.join(self.domain_list)), state='i')
fixed_domain_list = self.domain_list.copy()
for dom_uuid in fixed_domain_list:
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node flush'.format(self.name), state='i')
self.flush_thread = None
self.flush_stopper = False
return
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
target_node = common.findTargetNode(self.zk_conn, self.config, dom_uuid)
# Don't replace the previous node if the VM is already migrated
if zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)):
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
else:
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
if target_node is None:
self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e')
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(dom_uuid): 'shutdown' })
zkhandler.writedata(self.zk_conn, { '/domains/{}/node_autostart'.format(dom_uuid): 'True' })
# Wait for the VM to shut down
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['shutdown']:
time.sleep(0.1)
continue
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'migrate',
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): current_node
})
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
time.sleep(0.1)
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' })
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' })
self.flush_thread = None
self.flush_stopper = False
return
def unflush(self):
self.logger.out('Restoring node {} to active service.'.format(self.name), state='i')
fixed_domain_list = self.d_domain.copy()
for dom_uuid in fixed_domain_list:
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node unflush'.format(self.name), state='i')
self.flush_thread = None
self.flush_stopper = False
return
# Handle autostarts
autostart = zkhandler.readdata(self.zk_conn, '/domains/{}/node_autostart'.format(dom_uuid))
node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
if autostart == 'True' and node == self.name:
self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'start',
'/domains/{}/node'.format(dom_uuid): self.name,
'/domains/{}/lastnode'.format(dom_uuid): '',
'/domains/{}/node_autostart'.format(dom_uuid): 'False'
})
continue
try:
last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
except:
continue
if last_node != self.name:
continue
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'migrate',
'/domains/{}/node'.format(dom_uuid): self.name,
'/domains/{}/lastnode'.format(dom_uuid): ''
})
# Wait for the VM to migrate back
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) in ['migrate', 'unmigrate', 'shutdown']:
time.sleep(0.1)
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' })
self.flush_thread = None
self.flush_stopper = False
return

View File

@ -0,0 +1,101 @@
#!/usr/bin/env python3
# VMConsoleWatcherInstance.py - Class implementing a console log watcher for PVC domains
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import uuid
import time
import threading
import libvirt
from collections import deque
import fcntl
import signal
import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
class VMConsoleWatcherInstance(object):
# Initialization function
def __init__(self, domuuid, domname, zk_conn, config, logger, this_node):
self.domuuid = domuuid
self.domname = domname
self.zk_conn = zk_conn
self.config = config
self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname)
self.console_log_lines = config['console_log_lines']
self.logger = logger
self.this_node = this_node
# Try to append (create) the logfile and set its permissions
open(self.logfile, 'a').close()
os.chmod(self.logfile, 0o600)
self.logdeque = deque(open(self.logfile), self.console_log_lines)
self.stamp = None
self.cached_stamp = None
# Set up the deque with the current contents of the log
self.last_loglines = None
self.loglines = None
# Thread options
self.thread = None
self.thread_stopper = threading.Event()
# Start execution thread
def start(self):
self.thread_stopper.clear()
self.thread = threading.Thread(target=self.run, args=(), kwargs={})
self.logger.out('Starting VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
self.thread.start()
# Stop execution thread
def stop(self):
if self.thread and self.thread.isAlive():
self.logger.out('Stopping VM log parser', state='i', prefix='Domain {}:'.format(self.domuuid))
self.thread_stopper.set()
# Do one final flush
self.update()
# Main entrypoint
def run(self):
# Main loop
while not self.thread_stopper.is_set():
self.update()
time.sleep(0.5)
def update(self):
self.stamp = os.stat(self.logfile).st_mtime
if self.stamp != self.cached_stamp:
self.cached_stamp = self.stamp
self.fetch_lines()
# Update Zookeeper with the new loglines if they changed
if self.loglines != self.last_loglines:
zkhandler.writedata(self.zk_conn, { '/domains/{}/consolelog'.format(self.domuuid): self.loglines })
self.last_loglines = self.loglines
def fetch_lines(self):
self.logdeque = deque(open(self.logfile), self.console_log_lines)
self.loglines = ''.join(self.logdeque)

View File

@ -0,0 +1,600 @@
#!/usr/bin/env python3
# VMInstance.py - Class implementing a PVC virtual machine in pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import uuid
import socket
import time
import threading
import libvirt
import kazoo.client
import json
import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance
def flush_locks(zk_conn, logger, dom_uuid):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images
rbd_list = zkhandler.readdata(zk_conn, '/domains/{}/rbdlist'.format(dom_uuid)).split(',')
for rbd in rbd_list:
# Check if a lock exists
lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd))
if lock_list_retcode != 0:
logger.out('Failed to obtain lock list for volume "{}"'.format(rbd), state='e')
continue
try:
lock_list = json.loads(lock_list_stdout)
except Exception as e:
logger.out('Failed to parse lock list for volume "{}": {}'.format(rbd, e), state='e')
continue
# If there's at least one lock
if lock_list:
# Loop through the locks
for lock, detail in lock_list.items():
# Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock, detail['locker']))
if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}"\n{}'.format(lock, rbd, lock_remove_stderr), state='e')
continue
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock, rbd), state='o')
return True
# Primary command function
def run_command(zk_conn, logger, this_node, data):
# Get the command and args
command, args = data.split()
# Flushing VM RBD locks
if command == 'flush_locks':
dom_uuid = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains')
with zk_lock:
# Add the OSD
result = flush_locks(zk_conn, logger, dom_uuid)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/domains': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/domains': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
class VMInstance(object):
# Initialization function
def __init__(self, domuuid, zk_conn, config, logger, this_node):
# Passed-in variables on creation
self.domuuid = domuuid
self.zk_conn = zk_conn
self.config = config
self.logger = logger
self.this_node = this_node
# Get data from zookeeper
self.domname = zkhandler.readdata(zk_conn, '/domains/{}'.format(domuuid))
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid))
try:
self.pinpolicy = zkhandler.readdata(self.zk_conn, '/domains/{}/pinpolicy'.format(self.domuuid))
except:
self.pinpolicy = "None"
# These will all be set later
self.instart = False
self.inrestart = False
self.inmigrate = False
self.inreceive = False
self.inshutdown = False
self.instop = False
# Libvirt domuuid
self.dom = self.lookupByUUID(self.domuuid)
# Log watcher instance
self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zk_conn, self.config, self.logger, self.this_node)
# Watch for changes to the state field in Zookeeper
@self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid))
def watch_state(data, stat, event=""):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Perform a management command
self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i')
state_thread = threading.Thread(target=self.manage_vm_state, args=(), kwargs={})
state_thread.start()
# Get data functions
def getstate(self):
return self.state
def getnode(self):
return self.node
def getdom(self):
return self.dom
def getmemory(self):
try:
memory = int(self.dom.info()[2] / 1024)
except:
memory = 0
return memory
def getvcpus(self):
try:
vcpus = int(self.dom.info()[3])
except:
vcpus = 0
return vcpus
# Manage local node domain_list
def addDomainToList(self):
if not self.domuuid in self.this_node.domain_list:
try:
# Add the domain to the domain_list array
self.this_node.domain_list.append(self.domuuid)
# Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list) })
except Exception as e:
self.logger.out('Error adding domain to list: {}'.format(e), state='e')
def removeDomainFromList(self):
if self.domuuid in self.this_node.domain_list:
try:
# Remove the domain from the domain_list array
self.this_node.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list) })
except Exception as e:
self.logger.out('Error removing domain from list: {}'.format(e), state='e')
# Start up the VM
def start_vm(self):
# Start the log watcher
self.console_log_instance.start()
self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.instart = False
return
# Try to get the current state in case it's already running
try:
self.dom = self.lookupByUUID(self.domuuid)
curstate = self.dom.state()[0]
except:
curstate = 'notstart'
if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
else:
# Or try to create it
try:
# Grab the domain information from Zookeeper
xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList()
self.logger.out('Successfully started VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = dom
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
except libvirt.libvirtError as e:
self.logger.out('Failed to create VM', state='e', prefix='Domain {}:'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' })
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): str(e) })
self.dom = None
lv_conn.close()
self.instart = False
# Restart the VM
def restart_vm(self):
self.logger.out('Restarting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.inrestart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.inrestart = False
return
self.shutdown_vm()
time.sleep(0.2)
self.start_vm()
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
lv_conn.close()
self.inrestart = False
# Stop the VM forcibly without updating state
def terminate_vm(self):
self.logger.out('Terminating VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
except AttributeError:
self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.removeDomainFromList()
self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Stop the VM forcibly
def stop_vm(self):
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
except AttributeError:
self.logger.out('Failed to stop VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.removeDomainFromList()
if self.inrestart == False:
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Shutdown the VM gracefully
def shutdown_vm(self):
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
is_aborted = False
self.inshutdown = True
self.dom.shutdown()
tick = 0
while True:
tick += 2
time.sleep(2)
# Abort shutdown if the state changes to start
current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
if current_state not in ['shutdown', 'restart']:
self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}:'.format(self.domuuid))
is_aborted = True
break
try:
lvdomstate = self.dom.state()[0]
except:
lvdomstate = None
if lvdomstate != libvirt.VIR_DOMAIN_RUNNING:
self.removeDomainFromList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.dom = None
# Stop the log watcher
self.console_log_instance.stop()
break
# HARDCODE: 90s is a reasonable amount of time for any operating system to shut down cleanly
if tick >= 90:
self.logger.out('Shutdown timeout expired', state='e', prefix='Domain {}:'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
break
self.inshutdown = False
if is_aborted:
self.manage_vm_state()
if self.inrestart:
# Wait to prevent race conditions
time.sleep(1)
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
def live_migrate_vm(self):
dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain'])
dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain'])
try:
# Open a connection to the destination
dest_lv_conn = libvirt.open(dest_lv)
if not dest_lv_conn:
raise
except:
self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}:'.format(self.domuuid))
return False
try:
# Send the live migration; force the destination URI to ensure we transit over the cluster network
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0)
if not target_dom:
raise
except Exception as e:
self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}:'.format(self.domuuid))
dest_lv_conn.close()
return False
self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
dest_lv_conn.close()
return True
# Migrate the VM to a target host
def migrate_vm(self):
self.inmigrate = True
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}:'.format(self.domuuid))
migrate_ret = self.live_migrate_vm()
if not migrate_ret:
self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' })
else:
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
self.inmigrate = False
# Receive the migration from another host (wait until VM is running)
def receive_migrate(self):
self.inreceive = True
live_receive = True
tick = 0
self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid))
while True:
# Wait 1 second and increment the tick
time.sleep(1)
tick += 1
# Get zookeeper state and look for the VM in the local libvirt database
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
# If the dom is found
if self.dom:
lvdomstate = self.dom.state()[0]
if lvdomstate == libvirt.VIR_DOMAIN_RUNNING:
# VM has been received and started
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
break
else:
# If the state is no longer migrate
if self.state != 'migrate':
# The receive was aborted before it timed out or was completed
self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}:'.format(self.domuuid))
break
# If the dom is not found
else:
# If the state is changed to shutdown or stop
if self.state == 'shutdown' or self.state == 'stop':
# The receive failed on the remote end, and VM is being shut down instead
live_receive = False
self.logger.out('Send failed on remote end', state='w', prefix='Domain {}:'.format(self.domuuid))
break
# If we've already been waiting 90s for a receive
# HARDCODE: 90s should be plenty of time for even extremely large VMs on reasonable networks
if tick > 90:
# The receive timed out
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' })
self.logger.out('Receive timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid))
break
# We are waiting on a shutdown
if not live_receive:
tick = 0
self.logger.out('Waiting for VM to shut down on remote end', state='i', prefix='Domain {}:'.format(self.domuuid))
while True:
# Wait 1 second and increment the tick
time.sleep(1)
tick += 1
# Get zookeeper state and look for the VM in the local libvirt database
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
# If the VM has stopped
if self.state == 'stop':
# Wait one more second to avoid race conditions
time.sleep(1)
# Start the VM up
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
break
# If we've already been waiting 120s for a shutdown
# HARDCODE: The remote timeout is 90s, so an extra 30s of buffer
if tick > 120:
# The shutdown timed out; something is very amiss, so switch state to fail and abort
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(self.domuuid): 'fail',
'/domains/{}/failedreason'.format(self.domuuid): 'Timeout waiting for migrate or shutdown'
})
self.logger.out('Shutdown timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid))
break
self.inreceive = False
#
# Main function to manage a VM (taking only self)
#
def manage_vm_state(self):
# Update the current values from zookeeper
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid))
# Check the current state of the VM
try:
if self.dom != None:
running, reason = self.dom.state()
else:
raise
except:
running = libvirt.VIR_DOMAIN_NOSTATE
self.logger.out('VM state change for "{}": {} {}'.format(self.domuuid, self.state, self.node), state='i')
#######################
# Handle state changes
#######################
# Valid states are:
# start
# migrate
# restart
# shutdown
# stop
# States we don't (need to) handle are:
# disable
# provision
# Conditional pass one - Are we already performing an action
if self.instart == False \
and self.inrestart == False \
and self.inmigrate == False \
and self.inreceive == False \
and self.inshutdown == False \
and self.instop == False:
# Conditional pass two - Is this VM configured to run on this node
if self.node == self.this_node.name:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM is already running and should be
if self.state == "start":
# Start the log watcher
self.console_log_instance.start()
# Add domain to running list
self.addDomainToList()
# VM is already running and should be but stuck in migrate state
elif self.state == "migrate":
# Start the log watcher
self.console_log_instance.start()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
# Add domain to running list
self.addDomainToList()
# VM should be restarted
elif self.state == "restart":
self.restart_vm()
# VM should be shut down
elif self.state == "shutdown":
self.shutdown_vm()
# VM should be stopped
elif self.state == "stop":
self.stop_vm()
else:
# VM should be started
if self.state == "start":
# Start the domain
self.start_vm()
# VM should be migrated to this node
elif self.state == "migrate":
# Receive the migration
self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running)
if self.state == "restart":
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
# VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
# VM should be stoped; ensure it's gone from this node's domain_list
elif self.state == "stop":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
else:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM should be migrated away from this node
if self.state == "migrate":
self.migrate_vm()
# VM should be shutdown gracefully
elif self.state == 'shutdown':
self.shutdown_vm()
# VM should be forcibly terminated
else:
self.terminate_vm()
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
# 1. Takes a text UUID and handles converting it to bytes
# 2. Try's it and returns a sensible value if not
def lookupByUUID(self, tuuid):
# Don't do anything if the VM shouldn't live on this node
if self.node != self.this_node.name:
return None
lv_conn = None
libvirt_name = "qemu:///system"
# Convert the text UUID to bytes
buuid = uuid.UUID(tuuid).bytes
# Try
try:
# Open a libvirt connection
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
return None
# Lookup the UUID
dom = lv_conn.lookupByUUID(buuid)
# Fail
except:
dom = None
# After everything
finally:
# Close the libvirt connection
if lv_conn != None:
lv_conn.close()
# Return the dom object (or None)
return dom

View File

@ -0,0 +1,856 @@
#!/usr/bin/env python3
# VXNetworkInstance.py - Class implementing a PVC VM network and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import sys
import time
from textwrap import dedent
import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
class VXNetworkInstance(object):
# Initialization function
def __init__ (self, vni, zk_conn, config, logger, this_node, dns_aggregator):
self.vni = vni
self.zk_conn = zk_conn
self.config = config
self.logger = logger
self.this_node = this_node
self.dns_aggregator = dns_aggregator
self.vni_dev = config['vni_dev']
self.vni_mtu = config['vni_mtu']
self.bridge_dev = config['bridge_dev']
self.nettype = zkhandler.readdata(self.zk_conn, '/networks/{}/nettype'.format(self.vni))
if self.nettype == 'bridged':
self.logger.out(
'Creating new bridged network',
prefix='VNI {}'.format(self.vni),
state='o'
)
self.init_bridged()
elif self.nettype == 'managed':
self.logger.out(
'Creating new managed network',
prefix='VNI {}'.format(self.vni),
state='o'
)
self.init_managed()
else:
self.logger.out(
'Invalid network type {}'.format(self.nettype),
prefix='VNI {}'.format(self.vni),
state='o'
)
pass
# Initialize a bridged network
def init_bridged(self):
self.old_description = None
self.description = None
self.vlan_nic = 'vlan{}'.format(self.vni)
self.bridge_nic = 'vmbr{}'.format(self.vni)
# Zookeper handlers for changed states
@self.zk_conn.DataWatch('/networks/{}'.format(self.vni))
def watch_network_description(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.description != data.decode('ascii'):
self.old_description = self.description
self.description = data.decode('ascii')
self.createNetworkBridged()
# Initialize a managed network
def init_managed(self):
self.old_description = None
self.description = None
self.domain = None
self.name_servers = None
self.ip6_gateway = zkhandler.readdata(self.zk_conn, '/networks/{}/ip6_gateway'.format(self.vni))
self.ip6_network = zkhandler.readdata(self.zk_conn, '/networks/{}/ip6_network'.format(self.vni))
self.ip6_cidrnetmask = zkhandler.readdata(self.zk_conn, '/networks/{}/ip6_network'.format(self.vni)).split('/')[-1]
self.dhcp6_flag = ( zkhandler.readdata(self.zk_conn, '/networks/{}/dhcp6_flag'.format(self.vni)) == 'True' )
self.ip4_gateway = zkhandler.readdata(self.zk_conn, '/networks/{}/ip4_gateway'.format(self.vni))
self.ip4_network = zkhandler.readdata(self.zk_conn, '/networks/{}/ip4_network'.format(self.vni))
self.ip4_cidrnetmask = zkhandler.readdata(self.zk_conn, '/networks/{}/ip4_network'.format(self.vni)).split('/')[-1]
self.dhcp4_flag = ( zkhandler.readdata(self.zk_conn, '/networks/{}/dhcp4_flag'.format(self.vni)) == 'True' )
self.dhcp4_start = ( zkhandler.readdata(self.zk_conn, '/networks/{}/dhcp4_start'.format(self.vni)) == 'True' )
self.dhcp4_end = ( zkhandler.readdata(self.zk_conn, '/networks/{}/dhcp4_end'.format(self.vni)) == 'True' )
self.vxlan_nic = 'vxlan{}'.format(self.vni)
self.bridge_nic = 'vmbr{}'.format(self.vni)
self.nftables_netconf_filename = '{}/networks/{}.nft'.format(self.config['nft_dynamic_directory'], self.vni)
self.firewall_rules = []
self.dhcp_server_daemon = None
self.dnsmasq_hostsdir = '{}/{}'.format(self.config['dnsmasq_dynamic_directory'], self.vni)
self.dhcp_reservations = []
# Create the network hostsdir
common.run_os_command(
'/bin/mkdir --parents {}'.format(
self.dnsmasq_hostsdir
)
)
self.firewall_rules_base = """# Rules for network {vxlannic}
add chain inet filter {vxlannic}-in
add chain inet filter {vxlannic}-out
add rule inet filter {vxlannic}-in counter
add rule inet filter {vxlannic}-out counter
# Allow ICMP traffic into the router from network
add rule inet filter input ip protocol icmp meta iifname {bridgenic} counter accept
add rule inet filter input ip6 nexthdr icmpv6 meta iifname {bridgenic} counter accept
# Allow DNS, DHCP, and NTP traffic into the router from network
add rule inet filter input tcp dport 53 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 53 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 67 meta iifname {bridgenic} counter accept
add rule inet filter input udp dport 123 meta iifname {bridgenic} counter accept
add rule inet filter input ip6 nexthdr udp udp dport 547 meta iifname {bridgenic} counter accept
# Allow metadata API into the router from network
add rule inet filter input tcp dport 80 meta iifname {bridgenic} counter accept
# Block traffic into the router from network
add rule inet filter input meta iifname {bridgenic} counter drop
""".format(
vxlannic=self.vxlan_nic,
bridgenic=self.bridge_nic
)
self.firewall_rules_v4 = """# Jump from forward chain to this chain when matching net (IPv4)
add rule inet filter forward ip daddr {netaddr4} counter jump {vxlannic}-in
add rule inet filter forward ip saddr {netaddr4} counter jump {vxlannic}-out
""".format(
netaddr4=self.ip4_network,
vxlannic=self.vxlan_nic,
)
self.firewall_rules_v6 = """# Jump from forward chain to this chain when matching net (IPv4)
add rule inet filter forward ip6 daddr {netaddr6} counter jump {vxlannic}-in
add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
""".format(
netaddr6=self.ip6_network,
vxlannic=self.vxlan_nic,
)
self.firewall_rules_in = zkhandler.listchildren(self.zk_conn, '/networks/{}/firewall_rules/in'.format(self.vni))
self.firewall_rules_out = zkhandler.listchildren(self.zk_conn, '/networks/{}/firewall_rules/out'.format(self.vni))
# Zookeper handlers for changed states
@self.zk_conn.DataWatch('/networks/{}'.format(self.vni))
def watch_network_description(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.description != data.decode('ascii'):
self.old_description = self.description
self.description = data.decode('ascii')
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/domain'.format(self.vni))
def watch_network_domain(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.domain != data.decode('ascii'):
domain = data.decode('ascii')
if self.dhcp_server_daemon:
self.dns_aggregator.remove_network(self)
self.domain = domain
if self.dhcp_server_daemon:
self.dns_aggregator.add_network(self)
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/name_servers'.format(self.vni))
def watch_network_name_servers(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.name_servers != data.decode('ascii'):
name_servers = data.decode('ascii').split(',')
if self.dhcp_server_daemon:
self.dns_aggregator.remove_network(self)
self.name_servers = name_servers
if self.dhcp_server_daemon:
self.dns_aggregator.add_network(self)
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/ip6_network'.format(self.vni))
def watch_network_ip6_network(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip6_network != data.decode('ascii'):
ip6_network = data.decode('ascii')
self.ip6_network = ip6_network
self.ip6_cidrnetmask = ip6_network.split('/')[-1]
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/ip6_gateway'.format(self.vni))
def watch_network_gateway(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip6_gateway != data.decode('ascii'):
orig_gateway = self.ip6_gateway
if self.this_node.router_state == 'primary':
if orig_gateway:
self.removeGateway6Address()
self.ip6_gateway = data.decode('ascii')
if self.this_node.router_state == 'primary':
self.createGateway6Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/dhcp6_flag'.format(self.vni))
def watch_network_dhcp_status(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp6_flag != ( data.decode('ascii') == 'True' ):
self.dhcp6_flag = ( data.decode('ascii') == 'True' )
if self.dhcp6_flag and not self.dhcp_server_daemon and self.this_node.router_state == 'primary':
self.startDHCPServer()
elif self.dhcp_server_daemon and not self.dhcp4_flag and self.this_node.router_state == 'primary':
self.stopDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/ip4_network'.format(self.vni))
def watch_network_ip4_network(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip4_network != data.decode('ascii'):
ip4_network = data.decode('ascii')
self.ip4_network = ip4_network
self.ip4_cidrnetmask = ip4_network.split('/')[-1]
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/ip4_gateway'.format(self.vni))
def watch_network_gateway(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.ip4_gateway != data.decode('ascii'):
orig_gateway = self.ip4_gateway
if self.this_node.router_state == 'primary':
if orig_gateway:
self.removeGateway4Address()
self.ip4_gateway = data.decode('ascii')
if self.this_node.router_state == 'primary':
self.createGateway4Address()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/dhcp4_flag'.format(self.vni))
def watch_network_dhcp_status(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp4_flag != ( data.decode('ascii') == 'True' ):
self.dhcp4_flag = ( data.decode('ascii') == 'True' )
if self.dhcp4_flag and not self.dhcp_server_daemon and self.this_node.router_state == 'primary':
self.startDHCPServer()
elif self.dhcp_server_daemon and not self.dhcp6_flag and self.this_node.router_state == 'primary':
self.stopDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/dhcp4_start'.format(self.vni))
def watch_network_dhcp4_start(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp4_start != data.decode('ascii'):
self.dhcp4_start = data.decode('ascii')
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.DataWatch('/networks/{}/dhcp4_end'.format(self.vni))
def watch_network_dhcp4_end(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if data and self.dhcp4_end != data.decode('ascii'):
self.dhcp4_end = data.decode('ascii')
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.ChildrenWatch('/networks/{}/dhcp_reservations'.format(self.vni))
def watch_network_dhcp_reservations(new_reservations, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
if self.dhcp_reservations != new_reservations:
old_reservations = self.dhcp_reservations
self.dhcp_reservations = new_reservations
if self.this_node.router_state == 'primary':
self.updateDHCPReservations(old_reservations, new_reservations)
if self.dhcp_server_daemon:
self.stopDHCPServer()
self.startDHCPServer()
@self.zk_conn.ChildrenWatch('/networks/{}/firewall_rules/in'.format(self.vni))
def watch_network_firewall_rules(new_rules, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Don't run on the first pass
if self.firewall_rules_in != new_rules:
self.firewall_rules_in = new_rules
self.updateFirewallRules()
@self.zk_conn.ChildrenWatch('/networks/{}/firewall_rules/out'.format(self.vni))
def watch_network_firewall_rules(new_rules, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Don't run on the first pass
if self.firewall_rules_out != new_rules:
self.firewall_rules_out = new_rules
self.updateFirewallRules()
self.createNetworkManaged()
self.createFirewall()
def getvni(self):
return self.vni
def updateDHCPReservations(self, old_reservations_list, new_reservations_list):
for reservation in new_reservations_list:
if reservation not in old_reservations_list:
# Add new reservation file
filename = '{}/{}'.format(self.dnsmasq_hostsdir, reservation)
ipaddr = zkhandler.readdata(
self.zk_conn,
'/networks/{}/dhcp_reservations/{}/ipaddr'.format(
self.vni,
reservation
)
)
entry = '{},{}'.format(reservation, ipaddr)
# Write the entry
with open(filename, 'w') as outfile:
outfile.write(entry)
for reservation in old_reservations_list:
if reservation not in new_reservations_list:
# Remove old reservation file
filename = '{}/{}'.format(self.dnsmasq_hostsdir, reservation)
try:
os.remove(filename)
self.dhcp_server_daemon.signal('hup')
except:
pass
def updateFirewallRules(self):
if not self.ip4_network:
return
self.logger.out(
'Updating firewall rules',
prefix='VNI {}'.format(self.vni),
state='o'
)
ordered_acls_in = {}
ordered_acls_out = {}
sorted_acl_list = {'in': [], 'out': []}
full_ordered_rules = []
for acl in self.firewall_rules_in:
order = zkhandler.readdata(self.zk_conn, '/networks/{}/firewall_rules/in/{}/order'.format(self.vni, acl))
ordered_acls_in[order] = acl
for acl in self.firewall_rules_out:
order = zkhandler.readdata(self.zk_conn, '/networks/{}/firewall_rules/out/{}/order'.format(self.vni, acl))
ordered_acls_out[order] = acl
for order in sorted(ordered_acls_in.keys()):
sorted_acl_list['in'].append(ordered_acls_in[order])
for order in sorted(ordered_acls_out.keys()):
sorted_acl_list['out'].append(ordered_acls_out[order])
for direction in 'in', 'out':
for acl in sorted_acl_list[direction]:
rule_prefix = "add rule inet filter vxlan{}-{} counter".format(self.vni, direction)
rule_data = zkhandler.readdata(self.zk_conn, '/networks/{}/firewall_rules/{}/{}/rule'.format(self.vni, direction, acl))
rule = '{} {}'.format(rule_prefix, rule_data)
full_ordered_rules.append(rule)
firewall_rules = self.firewall_rules_base
if self.ip6_gateway != 'None':
firewall_rules += self.firewall_rules_v6
if self.ip4_gateway != 'None':
firewall_rules += self.firewall_rules_v4
output = "{}\n# User rules\n{}\n".format(
firewall_rules,
'\n'.join(full_ordered_rules)
)
with open(self.nftables_netconf_filename, 'w') as nfnetfile:
nfnetfile.write(dedent(output))
# Reload firewall rules
nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory'])
common.reload_firewall_rules(self.logger, nftables_base_filename)
# Create bridged network configuration
def createNetworkBridged(self):
self.logger.out(
'Creating bridged vLAN device {} on interface {}'.format(
self.vlan_nic,
self.bridge_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Create vLAN interface
common.run_os_command(
'ip link add link {} name {} type vlan id {}'.format(
self.bridge_dev,
self.vlan_nic,
self.vni
)
)
# Create bridge interface
common.run_os_command(
'brctl addbr {}'.format(
self.bridge_nic
)
)
# Set MTU of vLAN and bridge NICs
vx_mtu = self.vni_mtu
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.vlan_nic,
vx_mtu
)
)
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.bridge_nic,
vx_mtu
)
)
# Disable tx checksum offload on bridge interface (breaks DHCP on Debian < 9)
common.run_os_command(
'ethtool -K {} tx off'.format(
self.bridge_nic
)
)
# Disable IPv6 DAD on bridge interface
common.run_os_command(
'sysctl net.ipv6.conf.{}.accept_dad=0'.format(
self.bridge_nic
)
)
# Add vLAN interface to bridge interface
common.run_os_command(
'brctl addif {} {}'.format(
self.bridge_nic,
self.vlan_nic
)
)
# Create managed network configuration
def createNetworkManaged(self):
self.logger.out(
'Creating VXLAN device on interface {}'.format(
self.vni_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Create VXLAN interface
common.run_os_command(
'ip link add {} type vxlan id {} dstport 4789 dev {}'.format(
self.vxlan_nic,
self.vni,
self.vni_dev
)
)
# Create bridge interface
common.run_os_command(
'brctl addbr {}'.format(
self.bridge_nic
)
)
# Set MTU of VXLAN and bridge NICs
vx_mtu = self.vni_mtu - 50
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.vxlan_nic,
vx_mtu
)
)
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.bridge_nic,
vx_mtu
)
)
# Disable tx checksum offload on bridge interface (breaks DHCP on Debian < 9)
common.run_os_command(
'ethtool -K {} tx off'.format(
self.bridge_nic
)
)
# Disable IPv6 DAD on bridge interface
common.run_os_command(
'sysctl net.ipv6.conf.{}.accept_dad=0'.format(
self.bridge_nic
)
)
# Add VXLAN interface to bridge interface
common.run_os_command(
'brctl addif {} {}'.format(
self.bridge_nic,
self.vxlan_nic
)
)
def createFirewall(self):
if self.nettype == 'managed':
# For future use
self.updateFirewallRules()
def createGateways(self):
if self.nettype == 'managed':
if self.ip6_gateway != 'None':
self.createGateway6Address()
if self.ip4_gateway != 'None':
self.createGateway4Address()
def createGateway6Address(self):
if self.this_node.router_state == 'primary':
self.logger.out(
'Creating gateway {}/{} on interface {}'.format(
self.ip6_gateway,
self.ip6_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.createIPAddress(self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic)
def createGateway4Address(self):
if self.this_node.router_state == 'primary':
self.logger.out(
'Creating gateway {}/{} on interface {}'.format(
self.ip4_gateway,
self.ip4_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.createIPAddress(self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic)
def startDHCPServer(self):
if self.this_node.router_state == 'primary' and self.nettype == 'managed':
self.logger.out(
'Starting dnsmasq DHCP server on interface {}'.format(
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Recreate the environment we need for dnsmasq
pvcnoded_config_file = os.environ['PVCD_CONFIG_FILE']
dhcp_environment = {
'DNSMASQ_BRIDGE_INTERFACE': self.bridge_nic,
'PVCD_CONFIG_FILE': pvcnoded_config_file
}
# Define the dnsmasq config fragments
dhcp_configuration_base = [
'--domain-needed',
'--bogus-priv',
'--no-hosts',
'--dhcp-authoritative',
'--filterwin2k',
'--expand-hosts',
'--domain-needed',
'--domain={}'.format(self.domain),
'--local=/{}/'.format(self.domain),
'--log-facility=-',
'--log-dhcp',
'--keep-in-foreground',
'--leasefile-ro',
'--dhcp-script={}/pvcnoded/dnsmasq-zookeeper-leases.py'.format(os.getcwd()),
'--dhcp-hostsdir={}'.format(self.dnsmasq_hostsdir),
'--bind-interfaces',
]
dhcp_configuration_v4 = [
'--listen-address={}'.format(self.ip4_gateway),
'--auth-zone={}'.format(self.domain),
'--auth-peer={}'.format(self.ip4_gateway),
'--auth-server={}'.format(self.ip4_gateway),
'--auth-sec-servers={}'.format(self.ip4_gateway),
]
dhcp_configuration_v4_dhcp = [
'--dhcp-option=option:ntp-server,{}'.format(self.ip4_gateway),
'--dhcp-range={},{},48h'.format(self.dhcp4_start, self.dhcp4_end),
]
dhcp_configuration_v6 = [
'--listen-address={}'.format(self.ip6_gateway),
'--auth-zone={}'.format(self.domain),
'--auth-peer={}'.format(self.ip6_gateway),
'--auth-server={}'.format(self.ip6_gateway),
'--auth-sec-servers={}'.format(self.ip6_gateway),
'--dhcp-option=option6:dns-server,[{}]'.format(self.ip6_gateway),
'--dhcp-option=option6:sntp-server,[{}]'.format(self.ip6_gateway),
'--enable-ra',
]
dhcp_configuration_v6_dualstack = [
'--dhcp-range=net:{nic},::,constructor:{nic},ra-stateless,ra-names'.format(nic=self.bridge_nic),
]
dhcp_configuration_v6_only = [
'--auth-server={}'.format(self.ip6_gateway),
'--dhcp-range=net:{nic},::2,::ffff:ffff:ffff:ffff,constructor:{nic},64,24h'.format(nic=self.bridge_nic),
]
# Assemble the DHCP configuration
dhcp_configuration = dhcp_configuration_base
if self.dhcp6_flag:
dhcp_configuration += dhcp_configuration_v6
if self.dhcp4_flag:
dhcp_configuration += dhcp_configuration_v6_dualstack
else:
dhcp_configuration += dhcp_configuration_v6_only
else:
dhcp_configuration += dhcp_configuration_v4
if self.dhcp4_flag:
dhcp_configuration += dhcp_configuration_v4_dhcp
# Start the dnsmasq process in a thread
print('/usr/sbin/dnsmasq {}'.format(' '.join(dhcp_configuration)))
self.dhcp_server_daemon = common.run_os_daemon(
'/usr/sbin/dnsmasq {}'.format(
' '.join(dhcp_configuration)
),
environment=dhcp_environment,
logfile='{}/dnsmasq-{}.log'.format(self.config['dnsmasq_log_directory'], self.vni)
)
# Remove network
def removeNetwork(self):
if self.nettype == 'bridged':
self.removeNetworkBridged()
elif self.nettype == 'managed':
self.removeNetworkManaged()
# Remove bridged network configuration
def removeNetworkBridged(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.vni_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip link set {} down'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link set {} down'.format(
self.vlan_nic
)
)
common.run_os_command(
'brctl delif {} {}'.format(
self.bridge_nic,
self.vlan_nic
)
)
common.run_os_command(
'brctl delbr {}'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link delete {}'.format(
self.vlan_nic
)
)
# Remove managed network configuration
def removeNetworkManaged(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.vni_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.run_os_command(
'ip link set {} down'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link set {} down'.format(
self.vxlan_nic
)
)
common.run_os_command(
'brctl delif {} {}'.format(
self.bridge_nic,
self.vxlan_nic
)
)
common.run_os_command(
'brctl delbr {}'.format(
self.bridge_nic
)
)
common.run_os_command(
'ip link delete {}'.format(
self.vxlan_nic
)
)
def removeFirewall(self):
self.logger.out(
'Removing firewall rules',
prefix='VNI {}'.format(self.vni),
state='o'
)
try:
os.remove(self.nftables_netconf_filename)
except:
pass
# Reload firewall rules
nftables_base_filename = '{}/base.nft'.format(self.config['nft_dynamic_directory'])
common.reload_firewall_rules(self.logger, nftables_base_filename)
def removeGateways(self):
if self.nettype == 'managed':
if self.ip6_gateway != 'None':
self.removeGateway6Address()
if self.ip4_gateway != 'None':
self.removeGateway4Address()
def removeGateway6Address(self):
self.logger.out(
'Removing gateway {}/{} from interface {}'.format(
self.ip6_gateway,
self.ip6_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.removeIPAddress(self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic)
def removeGateway4Address(self):
self.logger.out(
'Removing gateway {}/{} from interface {}'.format(
self.ip4_gateway,
self.ip4_cidrnetmask,
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
common.removeIPAddress(self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic)
def stopDHCPServer(self):
if self.nettype == 'managed' and self.dhcp_server_daemon:
self.logger.out(
'Stopping dnsmasq DHCP server on interface {}'.format(
self.bridge_nic
),
prefix='VNI {}'.format(self.vni),
state='o'
)
# Terminate, then kill
self.dhcp_server_daemon.signal('term')
time.sleep(0.2)
self.dhcp_server_daemon.signal('kill')
self.dhcp_server_daemon = None

View File

View File

@ -0,0 +1,260 @@
#!/usr/bin/env python3
# common.py - PVC daemon function library, common fuctions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import subprocess
import threading
import signal
import os
import time
import shlex
import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
class OSDaemon(object):
def __init__(self, command_string, environment, logfile):
command = shlex.split(command_string)
# Set stdout to be a logfile if set
if logfile:
stdout = open(logfile, 'a')
else:
stdout = subprocess.PIPE
# Invoke the process
self.proc = subprocess.Popen(
command,
env=environment,
stdout=stdout,
stderr=stdout,
)
# Signal the process
def signal(self, sent_signal):
signal_map = {
'hup': signal.SIGHUP,
'int': signal.SIGINT,
'term': signal.SIGTERM,
'kill': signal.SIGKILL
}
self.proc.send_signal(signal_map[sent_signal])
def run_os_daemon(command_string, environment=None, logfile=None):
daemon = OSDaemon(command_string, environment, logfile)
return daemon
# Run a oneshot command, optionally without blocking
def run_os_command(command_string, background=False, environment=None, timeout=None):
command = shlex.split(command_string)
if background:
def runcmd():
try:
subprocess.run(
command,
env=environment,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.TimeoutExpired:
pass
thread = threading.Thread(target=runcmd, args=())
thread.start()
return 0, None, None
else:
try:
command_output = subprocess.run(
command,
env=environment,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
retcode = command_output.returncode
except subprocess.TimeoutExpired:
retcode = 128
try:
stdout = command_output.stdout.decode('ascii')
except:
stdout = ''
try:
stderr = command_output.stderr.decode('ascii')
except:
stderr = ''
return retcode, stdout, stderr
# Reload the firewall rules of the system
def reload_firewall_rules(logger, rules_file):
logger.out('Reloading firewall configuration', state='o')
retcode, stdout, stderr = run_os_command('/usr/sbin/nft -f {}'.format(rules_file))
if retcode != 0:
logger.out('Failed to reload configuration: {}'.format(stderr), state='e')
# Create IP address
def createIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
'ip address add {}/{} dev {}'.format(
ipaddr,
cidrnetmask,
dev
)
)
run_os_command(
'arping -P -U -W 0.02 -c 2 -i {dev} -S {ip} {ip}'.format(
dev=dev,
ip=ipaddr
)
)
# Remove IP address
def removeIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
'ip address delete {}/{} dev {}'.format(
ipaddr,
cidrnetmask,
dev
)
)
#
# Find a migration target
#
def findTargetNode(zk_conn, config, dom_uuid):
# Determine VM node limits; set config value if read fails
try:
node_limit = zkhandler.readdata(zk_conn, '/domains/{}/node_limit'.format(dom_uuid)).split(',')
if not any(node_limit):
node_limit = ''
except:
node_limit = ''
zkhandler.writedata(zk_conn, { '/domains/{}/node_limit'.format(dom_uuid): '' })
# Determine VM search field
try:
search_field = zkhandler.readdata(zk_conn, '/domains/{}/node_selector'.format(dom_uuid))
except Exception as e:
search_field = None
# If our search field is invalid, use and set the default (for next time)
if search_field is None or search_field == 'None':
search_field = config['migration_target_selector']
zkhandler.writedata(zk_conn, { '/domains/{}/node_selector'.format(dom_uuid): config['migration_target_selector'] })
# Execute the search
if search_field == 'mem':
return findTargetNodeMem(zk_conn, node_limit, dom_uuid)
if search_field == 'load':
return findTargetNodeLoad(zk_conn, node_limit, dom_uuid)
if search_field == 'vcpus':
return findTargetNodeVCPUs(zk_conn, node_limit, dom_uuid)
if search_field == 'vms':
return findTargetNodeVMs(zk_conn, node_limit, dom_uuid)
# Nothing was found
return None
# Get the list of valid target nodes
def getNodes(zk_conn, node_limit, dom_uuid):
valid_node_list = []
full_node_list = zkhandler.listchildren(zk_conn, '/nodes')
current_node = zkhandler.readdata(zk_conn, '/domains/{}/node'.format(dom_uuid))
for node in full_node_list:
if node_limit and node not in node_limit:
continue
daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node))
domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node))
if node == current_node:
continue
if daemon_state != 'run' or domain_state != 'ready':
continue
valid_node_list.append(node)
return valid_node_list
# via free memory (relative to allocated memory)
def findTargetNodeMem(zk_conn, node_limit, dom_uuid):
most_allocfree = 0
target_node = None
node_list = getNodes(zk_conn, node_limit, dom_uuid)
for node in node_list:
memalloc = int(zkhandler.readdata(zk_conn, '/nodes/{}/memalloc'.format(node)))
memused = int(zkhandler.readdata(zk_conn, '/nodes/{}/memused'.format(node)))
memfree = int(zkhandler.readdata(zk_conn, '/nodes/{}/memfree'.format(node)))
memtotal = memused + memfree
allocfree = memtotal - memalloc
if allocfree > most_allocfree:
most_allocfree = allocfree
target_node = node
return target_node
# via load average
def findTargetNodeLoad(zk_conn, node_limit, dom_uuid):
least_load = 9999.0
target_node = None
node_list = getNodes(zk_conn, node_limit, dom_uuid)
for node in node_list:
load = float(zkhandler.readdata(zk_conn, '/nodes/{}/cpuload'.format(node)))
if load < least_load:
least_load = load
target_node = node
return target_node
# via total vCPUs
def findTargetNodeVCPUs(zk_conn, node_limit, dom_uuid):
least_vcpus = 9999
target_node = None
node_list = getNodes(zk_conn, node_limit, dom_uuid)
for node in node_list:
vcpus = int(zkhandler.readdata(zk_conn, '/nodes/{}/vcpualloc'.format(node)))
if vcpus < least_vcpus:
least_vcpus = vcpus
target_node = node
return target_node
# via total VMs
def findTargetNodeVMs(zk_conn, node_limit, dom_uuid):
least_vms = 9999
target_node = None
node_list = getNodes(zk_conn, node_limit, dom_uuid)
for node in node_list:
vms = int(zkhandler.readdata(zk_conn, '/nodes/{}/domainscount'.format(node)))
if vms < least_vms:
least_vms = vms
target_node = node
return target_node

View File

@ -0,0 +1,158 @@
#!/usr/bin/python3
# dnsmasq-zookeeper-leases.py - DNSMASQ leases script for Zookeeper
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import argparse
import os, sys
import kazoo.client
import re
import yaml
#
# Variables
#
#
# General Functions
#
def get_zookeeper_key():
# Get the interface from environment (passed by dnsmasq)
try:
interface = os.environ['DNSMASQ_BRIDGE_INTERFACE']
except Exception as e:
print('ERROR: DNSMASQ_BRIDGE_INTERFACE environment variable not found: {}'.format(e), file=sys.stderr)
exit(1)
# Get the ID of the interface (the digits)
network_vni = re.findall('\d+', interface)[0]
# Create the key
zookeeper_key = '/networks/{}/dhcp4_leases'.format(network_vni)
return zookeeper_key
def get_lease_expiry():
try:
expiry = os.environ['DNSMASQ_LEASE_EXPIRES']
except:
expiry = '0'
return expiry
def get_client_id():
try:
client_id = os.environ['DNSMASQ_CLIENT_ID']
except:
client_id = '*'
return client_id
def connect_zookeeper():
# We expect the environ to contain the config file
try:
pvcnoded_config_file = os.environ['PVCD_CONFIG_FILE']
except:
# Default place
pvcnoded_config_file = '/etc/pvc/pvcnoded.yaml'
with open(pvcnoded_config_file, 'r') as cfgfile:
try:
o_config = yaml.load(cfgfile)
except Exception as e:
print('ERROR: Failed to parse configuration file: {}'.format(e), file=sys.stderr)
exit(1)
try:
zk_conn = kazoo.client.KazooClient(hosts=o_config['pvc']['cluster']['coordinators'])
zk_conn.start()
except Exception as e:
print('ERROR: Failed to connect to Zookeeper: {}'.format(e), file=sys.stderr)
exit(1)
return zk_conn
def read_data(zk_conn, key):
return zk_conn.get(key)[0].decode('ascii')
def get_lease(zk_conn, zk_leases_key, macaddr):
expiry = read_data(zk_conn, '{}/{}/expiry'.format(zk_leases_key, macaddr))
ipaddr = read_data(zk_conn, '{}/{}/ipaddr'.format(zk_leases_key, macaddr))
hostname = read_data(zk_conn, '{}/{}/hostname'.format(zk_leases_key, macaddr))
clientid = read_data(zk_conn, '{}/{}/clientid'.format(zk_leases_key, macaddr))
return expiry, ipaddr, hostname, clientid
#
# Command Functions
#
def read_lease_database(zk_conn, zk_leases_key):
leases_list = zk_conn.get_children(zk_leases_key)
output_list = []
for macaddr in leases_list:
expiry, ipaddr, hostname, clientid = get_lease(zk_conn, zk_leases_key, macaddr)
data_string = '{} {} {} {} {}'.format(expiry, macaddr, ipaddr, hostname, clientid)
print('Reading lease from Zookeeper: {}'.format(data_string), file=sys.stderr)
output_list.append('{}'.format(data_string))
# Output list
print('\n'.join(output_list))
def add_lease(zk_conn, zk_leases_key, expiry, macaddr, ipaddr, hostname, clientid):
if not hostname:
hostname = ''
transaction = zk_conn.transaction()
transaction.create('{}/{}'.format(zk_leases_key, macaddr), ''.encode('ascii'))
transaction.create('{}/{}/expiry'.format(zk_leases_key, macaddr), expiry.encode('ascii'))
transaction.create('{}/{}/ipaddr'.format(zk_leases_key, macaddr), ipaddr.encode('ascii'))
transaction.create('{}/{}/hostname'.format(zk_leases_key, macaddr), hostname.encode('ascii'))
transaction.create('{}/{}/clientid'.format(zk_leases_key, macaddr), clientid.encode('ascii'))
transaction.commit()
def del_lease(zk_conn, zk_leases_key, macaddr, expiry):
zk_conn.delete('{}/{}'.format(zk_leases_key, macaddr), recursive=True)
#
# Instantiate the parser
#
parser = argparse.ArgumentParser(description='Store or retrieve dnsmasq leases in Zookeeper')
parser.add_argument('action', type=str, help='Action')
parser.add_argument('macaddr', type=str, help='MAC Address', nargs='?', default=None)
parser.add_argument('ipaddr', type=str, help='IP Address', nargs='?', default=None)
parser.add_argument('hostname', type=str, help='Hostname', nargs='?', default=None)
args = parser.parse_args()
action = args.action
macaddr = args.macaddr
ipaddr = args.ipaddr
hostname = args.hostname
zk_conn = connect_zookeeper()
zk_leases_key = get_zookeeper_key()
if action == 'init':
read_lease_database(zk_conn, zk_leases_key)
exit(0)
expiry = get_lease_expiry()
clientid = get_client_id()
#
# Choose action
#
print('Lease action - {} {} {} {}'.format(action, macaddr, ipaddr, hostname), file=sys.stderr)
if action == 'add':
add_lease(zk_conn, zk_leases_key, expiry, macaddr, ipaddr, hostname, clientid)
elif action == 'del':
del_lease(zk_conn, zk_leases_key, macaddr, expiry)
elif action == 'old':
pass

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
# fencing.py - PVC daemon function library, node fencing functions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import threading
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
import pvcnoded.VMInstance as VMInstance
#
# Fence thread entry function
#
def fenceNode(node_name, zk_conn, config, logger):
failcount = 0
# We allow exactly 3 saving throws for the host to come back online
while failcount < 3:
# Wait 5 seconds
time.sleep(5)
# Get the state
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
# Is it still 'dead'
if node_daemon_state == 'dead':
failcount += 1
logger.out('Node "{}" failed {} saving throws'.format(node_name, failcount), state='w')
# It changed back to something else so it must be alive
else:
logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o')
return
logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='w')
# Get IPMI information
ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name))
ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name))
ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name))
# Shoot it in the head
fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger)
# Hold to ensure the fence takes effect
time.sleep(3)
# Force into secondary network state if needed
if node_name in config['coordinators']:
zkhandler.writedata(zk_conn, { '/nodes/{}/routerstate'.format(node_name): 'secondary' })
if zkhandler.readdata(zk_conn, '/primary_node') == node_name:
zkhandler.writedata(zk_conn, { '/primary_node': 'none' })
# If the fence succeeded and successful_fence is migrate
if fence_status == True and config['successful_fence'] == 'migrate':
migrateFromFencedNode(zk_conn, node_name, config, logger)
# If the fence failed and failed_fence is migrate
if fence_status == False and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0':
migrateFromFencedNode(zk_conn, node_name, config, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zk_conn, node_name, config, logger):
logger.out('Migrating VMs from dead node "{}" to new hosts'.format(node_name), state='i')
dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split()
for dom_uuid in dead_node_running_domains:
VMInstance.flush_locks(zk_conn, logger, dom_uuid)
target_node = common.findTargetNode(zk_conn, config, dom_uuid)
if target_node is not None:
logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
zkhandler.writedata(zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'start',
'/domains/{}/node'.format(dom_uuid): target_node,
'/domains/{}/lastnode'.format(dom_uuid): node_name
})
else:
logger.out('No target node found for VM "{}"; VM will autostart on next unflush/ready of current node'.format(dom_uuid), state='i')
zkhandler.writedata(zk_conn, {
'/domains/{}/state'.format(dom_uuid): 'stopped',
'/domains/{}/node_autostart'.format(dom_uuid): 'True'
})
# Set node in flushed state for easy remigrating when it comes back
zkhandler.writedata(zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' })
#
# Perform an IPMI fence
#
def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
# Forcibly reboot the node
ipmi_command_reset = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power reset'.format(
ipmi_hostname, ipmi_user, ipmi_password
)
ipmi_reset_retcode, ipmi_reset_stdout, ipmi_reset_stderr = common.run_os_command(ipmi_command_reset)
time.sleep(2)
# Ensure the node is powered on
ipmi_command_status = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power status'.format(
ipmi_hostname, ipmi_user, ipmi_password
)
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(ipmi_command_status)
# Trigger a power start if needed
if ipmi_status_stdout != "Chassis Power is on":
ipmi_command_start = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power on'.format(
ipmi_hostname, ipmi_user, ipmi_password
)
ipmi_start_retcode, ipmi_start_stdout, ipmi_start_stderr = common.run_os_command(ipmi_command_start)
# Declare success or failure
if ipmi_reset_retcode == 0:
logger.out('Successfully rebooted dead node', state='o')
return True
else:
logger.out('Failed to reboot dead node', state='e')
print(ipmi_reset_stderr)
return False

View File

@ -0,0 +1,13 @@
#!/bin/bash
for disk in $( sudo rbd list ${BLSE_STORAGE_POOL_VM} | grep "^${vm}" ); do
echo -e " Disk: $disk"
locks="$( sudo rbd lock list ${BLSE_STORAGE_POOL_VM}/${disk} | grep '^client' )"
echo "${locks}"
if [[ -n "${locks}" ]]; then
echo -e " LOCK FOUND! Clearing."
locker="$( awk '{ print $1 }' <<<"${locks}" )"
id="$( awk '{ print $2" "$3 }' <<<"${locks}" )"
sudo rbd lock remove ${BLSE_STORAGE_POOL_VM}/${disk} "${id}" "${locker}"
fi
done

125
node-daemon/pvcnoded/log.py Normal file
View File

@ -0,0 +1,125 @@
#!/usr/bin/env python3
# log.py - Output (stdout + logfile) functions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import datetime
class Logger(object):
# Define a logger class for a daemon instance
# Keeps record of where to log, and is passed messages which are
# formatted in various ways based off secondary characteristics.
# ANSII colours for output
fmt_red = '\033[91m'
fmt_blue = '\033[94m'
fmt_cyan = '\033[96m'
fmt_green = '\033[92m'
fmt_yellow = '\033[93m'
fmt_purple = '\033[95m'
fmt_bold = '\033[1m'
fmt_end = '\033[0m'
last_colour = ''
last_prompt = ''
# Format maps
format_map_colourized = {
# Colourized formatting with chevron prompts (log_colours = True)
'o': { 'colour': fmt_green, 'prompt': '>>> ' },
'e': { 'colour': fmt_red, 'prompt': '>>> ' },
'w': { 'colour': fmt_yellow, 'prompt': '>>> ' },
't': { 'colour': fmt_purple, 'prompt': '>>> ' },
'i': { 'colour': fmt_blue, 'prompt': '>>> ' },
's': { 'colour': fmt_cyan, 'prompt': '>>> ' },
'x': { 'colour': last_colour, 'prompt': last_prompt }
}
format_map_textual = {
# Uncolourized formatting with text prompts (log_colours = False)
'o': { 'colour': '', 'prompt': 'ok: ' },
'e': { 'colour': '', 'prompt': 'failed: ' },
'w': { 'colour': '', 'prompt': 'warning: ' },
't': { 'colour': '', 'prompt': 'tick: ' },
'i': { 'colour': '', 'prompt': 'info: ' },
's': { 'colour': '', 'prompt': 'system: ' },
'x': { 'colour': '', 'prompt': last_prompt }
}
# Initialization of instance
def __init__(self, config):
self.config = config
if self.config['file_logging']:
self.logfile = self.config['log_directory'] + '/pvc.log'
# We open the logfile for the duration of our session, but have a hup function
self.writer = open(self.logfile, 'a', buffering=1)
self.last_colour = ''
self.last_prompt = ''
# Provide a hup function to close and reopen the writer
def hup(self):
self.writer.close()
self.writer = open(self.logfile, 'a', buffering=0)
# Output function
def out(self, message, state=None, prefix=''):
# Get the date
if self.config['log_dates']:
date = '{} - '.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'))
else:
date = ''
# Get the format map
if self.config['log_colours']:
format_map = self.format_map_colourized
endc = Logger.fmt_end
else:
format_map = self.format_map_textual
endc = ''
# Define an undefined state as 'x'; no date in these prompts
if not state:
state = 'x'
date = ''
# Get colour and prompt from the map
colour = format_map[state]['colour']
prompt = format_map[state]['prompt']
# Append space and separator to prefix
if prefix != '':
prefix = prefix + ' - '
# Assemble message string
message = colour + prompt + endc + date + prefix + message
# Log to stdout
if self.config['stdout_logging']:
print(message)
# Log to file
if self.config['file_logging']:
self.writer.write(message + '\n')
# Set last message variables
self.last_colour = colour
self.last_prompt = prompt

View File

@ -0,0 +1,132 @@
#!/usr/bin/env python3
# zkhandler.py - Secure versioned ZooKeeper updates
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2020 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import kazoo.client
import uuid
# Child list function
def listchildren(zk_conn, key):
children = zk_conn.get_children(key)
return children
# Key deletion function
def deletekey(zk_conn, key, recursive=True):
zk_conn.delete(key, recursive=recursive)
# Data read function
def readdata(zk_conn, key):
data_raw = zk_conn.get(key)
data = data_raw[0].decode('utf8')
meta = data_raw[1]
return data
# Data write function
def writedata(zk_conn, kv):
# Start up a transaction
zk_transaction = zk_conn.transaction()
# Proceed one KV pair at a time
for key in sorted(kv):
data = kv[key]
if not data:
data = ''
# Check if this key already exists or not
if not zk_conn.exists(key):
# We're creating a new key
zk_transaction.create(key, str(data).encode('utf8'))
else:
# We're updating a key with version validation
orig_data = zk_conn.get(key)
version = orig_data[1].version
# Set what we expect the new version to be
new_version = version + 1
# Update the data
zk_transaction.set_data(key, str(data).encode('utf8'))
# Set up the check
try:
zk_transaction.check(key, new_version)
except TypeError:
print('Zookeeper key "{}" does not match expected version'.format(key))
return False
# Commit the transaction
try:
zk_transaction.commit()
return True
except Exception:
return False
# Key rename function
def renamekey(zk_conn, kv):
# This one is not transactional because, inexplicably, transactions don't
# support either the recursive delete or recursive create operations that
# we need. Why? No explanation in the docs that I can find.
# Proceed one KV pair at a time
for key in sorted(kv):
old_name = key
new_name = kv[key]
old_data = zk_conn.get(old_name)[0]
# Find the children of old_name recursively
child_keys = list()
def get_children(key):
children = zk_conn.get_children(key)
if not children:
child_keys.append(key)
else:
for ckey in children:
get_children('{}/{}'.format(key, ckey))
get_children(old_name)
# Get the data out of each of the child keys
child_data = dict()
for ckey in child_keys:
child_data[ckey] = zk_conn.get(ckey)[0]
# Create the new parent key
zk_conn.create(new_name, old_data, makepath=True)
# For each child key, create the key and add the data
for ckey in child_keys:
new_ckey_name = ckey.replace(old_name, new_name)
zk_conn.create(new_ckey_name, child_data[ckey], makepath=True)
# Remove recursively the old key
zk_conn.delete(old_name, recursive=True)
# Write lock function
def writelock(zk_conn, key):
lock_id = str(uuid.uuid1())
lock = zk_conn.WriteLock('{}'.format(key), lock_id)
return lock
# Read lock function
def readlock(zk_conn, key):
lock_id = str(uuid.uuid1())
lock = zk_conn.ReadLock('{}'.format(key), lock_id)
return lock