Implements the storing of three VM metadata attributes: 1. Node limits - allows specifying a list of hosts on which the VM must run. This limit influences the migration behaviour of VMs. 2. Per-VM node selectors - allows each VM to have its migration autoselection method specified, to automatically allow different methods per VM based on the administrator's preferences. 3. VM autorestart - allows a VM to be automatically restarted from a stopped state, presumably due to a failure to find a target node (either due to limits or otherwise) during a flush/fence recovery, on the next node unflush/ready state of its home hypervisor. Useful mostly in conjunction with limits to ensure that VMs which were shut down due to there being no valid migration targets are started back up when their node becomes ready again. Includes the full client interaction with these metadata options, including printing, as well as defining a new function to modify this metadata. For the CLI it is set/modified either on `vm define` or via the `vm meta` command. For the API it is set/modified either on a POST to the `/vm` endpoint (during VM definition) or on POST to the `/vm/<vm>` endpoint. For the API this replaces the previous reserved word for VM creation from scratch as this will no longer be implemented in-daemon (see #22). Closes #52
466 lines
19 KiB
Python
466 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# NodeInstance.py - Class implementing a PVC node in pvcd
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
#
|
|
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import os
|
|
import sys
|
|
import psutil
|
|
import socket
|
|
import time
|
|
import libvirt
|
|
import threading
|
|
|
|
import pvcd.log as log
|
|
import pvcd.zkhandler as zkhandler
|
|
import pvcd.common as common
|
|
|
|
class NodeInstance(object):
|
|
# Initialization function
|
|
def __init__(self, name, this_node, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator):
|
|
# Passed-in variables on creation
|
|
self.name = name
|
|
self.this_node = this_node
|
|
self.zk_conn = zk_conn
|
|
self.config = config
|
|
self.logger = logger
|
|
# Which node is primary
|
|
self.primary_node = None
|
|
# States
|
|
self.daemon_mode = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonmode'.format(self.name))
|
|
self.daemon_state = 'stop'
|
|
self.router_state = 'client'
|
|
self.domain_state = 'ready'
|
|
# Object lists
|
|
self.d_node = d_node
|
|
self.d_network = d_network
|
|
self.d_domain = d_domain
|
|
self.dns_aggregator = dns_aggregator
|
|
# Printable lists
|
|
self.active_node_list = []
|
|
self.flushed_node_list = []
|
|
self.inactive_node_list = []
|
|
self.network_list = []
|
|
self.domain_list = []
|
|
# Node resources
|
|
self.domains_count = 0
|
|
self.memused = 0
|
|
self.memfree = 0
|
|
self.memalloc = 0
|
|
self.vcpualloc = 0
|
|
# Floating upstreams
|
|
if self.config['enable_networking']:
|
|
self.vni_dev = self.config['vni_dev']
|
|
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/')
|
|
self.upstream_dev = self.config['upstream_dev']
|
|
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
|
|
else:
|
|
self.vni_dev = None
|
|
self.vni_ipaddr = None
|
|
self.vni_cidrnetmask = None
|
|
self.upstream_dev = None
|
|
self.upstream_ipaddr = None
|
|
self.upstream_cidrnetmask = None
|
|
# Threads
|
|
self.flush_thread = None
|
|
# Flags
|
|
self.flush_stopper = False
|
|
|
|
# Zookeeper handlers for changed states
|
|
@self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
|
|
def watch_node_daemonstate(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 'stop'
|
|
|
|
if data != self.daemon_state:
|
|
self.daemon_state = data
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/routerstate'.format(self.name))
|
|
def watch_node_routerstate(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 'client'
|
|
|
|
if self.name == self.this_node and self.daemon_mode == 'coordinator':
|
|
# We're a coordinator so we care about networking
|
|
if data != self.router_state:
|
|
self.router_state = data
|
|
if self.config['enable_networking']:
|
|
if self.router_state == 'primary':
|
|
self.become_primary()
|
|
else:
|
|
self.become_secondary()
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/domainstate'.format(self.name))
|
|
def watch_node_domainstate(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 'unknown'
|
|
|
|
if data != self.domain_state:
|
|
self.domain_state = data
|
|
|
|
# toggle state management of this node
|
|
if self.name == self.this_node:
|
|
# Stop any existing flush jobs
|
|
if self.flush_thread:
|
|
self.flush_stopper = True
|
|
self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i')
|
|
while self.flush_stopper:
|
|
time.sleep(1)
|
|
self.flush_stopper = False
|
|
# Do flushing in a thread so it doesn't block the migrates out
|
|
if self.domain_state == 'flush':
|
|
self.flush_thread = threading.Thread(target=self.flush, args=(), kwargs={})
|
|
self.flush_thread.start()
|
|
# Do unflushing in a thread so it doesn't block the migrates in
|
|
if self.domain_state == 'unflush':
|
|
self.flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={})
|
|
self.flush_thread.start()
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
|
|
def watch_node_memfree(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 0
|
|
|
|
if data != self.memfree:
|
|
self.memfree = data
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/memused'.format(self.name))
|
|
def watch_node_memused(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 0
|
|
|
|
if data != self.memused:
|
|
self.memused = data
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/memalloc'.format(self.name))
|
|
def watch_node_memalloc(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 0
|
|
|
|
if data != self.memalloc:
|
|
self.memalloc = data
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/vcpualloc'.format(self.name))
|
|
def watch_node_vcpualloc(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 0
|
|
|
|
if data != self.vcpualloc:
|
|
self.vcpualloc = data
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/runningdomains'.format(self.name))
|
|
def watch_node_runningdomains(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii').split()
|
|
except AttributeError:
|
|
data = []
|
|
|
|
if data != self.domain_list:
|
|
self.domain_list = data
|
|
|
|
@self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
|
|
def watch_node_domainscount(data, stat, event=''):
|
|
if event and event.type == 'DELETED':
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode('ascii')
|
|
except AttributeError:
|
|
data = 0
|
|
|
|
if data != self.domains_count:
|
|
self.domains_count = data
|
|
|
|
# Update value functions
|
|
def update_node_list(self, d_node):
|
|
self.d_node = d_node
|
|
|
|
def update_network_list(self, d_network):
|
|
self.d_network = d_network
|
|
network_list = []
|
|
for network in self.d_network:
|
|
network_list.append(d_network[network].vni)
|
|
self.network_list = network_list
|
|
|
|
def update_domain_list(self, d_domain):
|
|
self.d_domain = d_domain
|
|
|
|
# Routing primary/secondary states
|
|
def become_secondary(self):
|
|
self.logger.out('Setting router {} to secondary state'.format(self.name), state='i')
|
|
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i')
|
|
time.sleep(2)
|
|
if self.config['enable_api']:
|
|
self.logger.out('Stopping PVC API client service', state='i')
|
|
common.run_os_command("systemctl stop pvc-api.service")
|
|
for network in self.d_network:
|
|
self.d_network[network].stopDHCPServer()
|
|
self.d_network[network].removeGateways()
|
|
self.removeFloatingAddresses()
|
|
self.dns_aggregator.stop_aggregator()
|
|
|
|
def become_primary(self):
|
|
# Establish a lock
|
|
with zkhandler.writelock(self.zk_conn, '/primary_node'):
|
|
self.logger.out('Setting router {} to primary state'.format(self.name), state='i')
|
|
|
|
# Create floating addresses
|
|
self.logger.out('Network list: {}'.format(', '.join(self.network_list)), state='i')
|
|
self.createFloatingAddresses()
|
|
# Start up the gateways and DHCP servers
|
|
for network in self.d_network:
|
|
self.d_network[network].createGateways()
|
|
self.d_network[network].startDHCPServer()
|
|
if self.config['enable_api']:
|
|
self.logger.out('Starting PVC API client service', state='i')
|
|
common.run_os_command("systemctl start pvc-api.service")
|
|
time.sleep(1)
|
|
|
|
# Switch Patroni leader to the local instance
|
|
self.logger.out('Setting Patroni leader to this node', state='i')
|
|
tick = 1
|
|
while True:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"""
|
|
patronictl
|
|
-c /etc/patroni/config.yml
|
|
-d zookeeper://localhost:2181
|
|
switchover
|
|
--candidate {}
|
|
--force
|
|
pvcdns
|
|
""".format(self.name)
|
|
)
|
|
if stdout:
|
|
self.logger.out('Successfully switched Patroni leader\n{}'.format(stdout), state='o')
|
|
break
|
|
elif stderr == "Error: Switchover target and source are the same.\n":
|
|
self.logger.out('Failed to switch Patroni leader to ourselves; this is fine\n{}'.format(stderr), state='w')
|
|
break
|
|
elif tick >= 5:
|
|
self.logger.out('Failed to switch Patroni leader after 5 tries; aborting\n{}'.format(stderr), state='e')
|
|
break
|
|
else:
|
|
self.logger.out('Failed to switch Patroni leader; retrying [{}/5]\n{}'.format(tick, stderr), state='e')
|
|
tick += 1
|
|
time.sleep(2)
|
|
|
|
# Start the DNS aggregator instance
|
|
time.sleep(1)
|
|
self.dns_aggregator.start_aggregator()
|
|
|
|
def createFloatingAddresses(self):
|
|
# VNI floating IP
|
|
self.logger.out(
|
|
'Creating floating management IP {}/{} on interface {}'.format(
|
|
self.vni_ipaddr,
|
|
self.vni_cidrnetmask,
|
|
'brcluster'
|
|
),
|
|
state='o'
|
|
)
|
|
common.createIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
|
|
# Upstream floating IP
|
|
self.logger.out(
|
|
'Creating floating upstream IP {}/{} on interface {}'.format(
|
|
self.upstream_ipaddr,
|
|
self.upstream_cidrnetmask,
|
|
self.upstream_dev
|
|
),
|
|
state='o'
|
|
)
|
|
common.createIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev)
|
|
|
|
def removeFloatingAddresses(self):
|
|
# VNI floating IP
|
|
self.logger.out(
|
|
'Removing floating management IP {}/{} from interface {}'.format(
|
|
self.vni_ipaddr,
|
|
self.vni_cidrnetmask,
|
|
'brcluster'
|
|
),
|
|
state='o'
|
|
)
|
|
common.removeIPAddress(self.vni_ipaddr, self.vni_cidrnetmask, 'brcluster')
|
|
# Upstream floating IP
|
|
self.logger.out(
|
|
'Removing floating upstream IP {}/{} from interface {}'.format(
|
|
self.upstream_ipaddr,
|
|
self.upstream_cidrnetmask,
|
|
self.upstream_dev
|
|
),
|
|
state='o'
|
|
)
|
|
common.removeIPAddress(self.upstream_ipaddr, self.upstream_cidrnetmask, self.upstream_dev)
|
|
|
|
# Flush all VMs on the host
|
|
def flush(self):
|
|
# Begin flush
|
|
self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i')
|
|
self.logger.out('VM list: {}'.format(', '.join(self.domain_list)), state='i')
|
|
fixed_domain_list = self.domain_list.copy()
|
|
for dom_uuid in fixed_domain_list:
|
|
# Allow us to cancel the operation
|
|
if self.flush_stopper:
|
|
self.logger.out('Aborting node flush'.format(self.name), state='i')
|
|
self.flush_thread = None
|
|
self.flush_stopper = False
|
|
return
|
|
|
|
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
|
|
|
|
target_node = common.findTargetHypervisor(self.zk_conn, self.config, dom_uuid)
|
|
|
|
# Don't replace the previous node if the VM is already migrated
|
|
if zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)):
|
|
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
|
|
else:
|
|
current_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
|
|
|
|
if target_node is None:
|
|
self.logger.out('Failed to find migration target for VM "{}"; shutting down and setting autostart flag'.format(dom_uuid), state='e')
|
|
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(dom_uuid): 'shutdown' })
|
|
zkhandler.writedata(self.zk_conn, { '/domains/{}/node_autostart'.format(dom_uuid): 'True' })
|
|
|
|
# Wait for the VM to shut down
|
|
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) != 'stop':
|
|
time.sleep(1)
|
|
|
|
continue
|
|
|
|
self.logger.out('Migrating VM "{}" to node "{}"'.format(dom_uuid, target_node), state='i')
|
|
zkhandler.writedata(self.zk_conn, {
|
|
'/domains/{}/state'.format(dom_uuid): 'migrate',
|
|
'/domains/{}/node'.format(dom_uuid): target_node,
|
|
'/domains/{}/lastnode'.format(dom_uuid): current_node
|
|
})
|
|
|
|
# Wait for the VM to migrate so the next VM's free RAM count is accurate (they migrate in serial anyways)
|
|
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) != 'start':
|
|
time.sleep(1)
|
|
|
|
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' })
|
|
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' })
|
|
self.flush_thread = None
|
|
self.flush_stopper = False
|
|
|
|
def unflush(self):
|
|
self.logger.out('Restoring node {} to active service.'.format(self.name), state='i')
|
|
fixed_domain_list = self.d_domain.copy()
|
|
for dom_uuid in fixed_domain_list:
|
|
# Allow us to cancel the operation
|
|
if self.flush_stopper:
|
|
self.logger.out('Aborting node unflush'.format(self.name), state='i')
|
|
self.flush_thread = None
|
|
self.flush_stopper = False
|
|
return
|
|
|
|
# Handle autostarts
|
|
autostart = zkhandler.readdata(self.zk_conn, '/domains/{}/node_autostart'.format(dom_uuid))
|
|
node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(dom_uuid))
|
|
if autostart == 'True' and node == self.name:
|
|
self.logger.out('Starting autostart VM "{}"'.format(dom_uuid), state='i')
|
|
zkhandler.writedata(self.zk_conn, {
|
|
'/domains/{}/state'.format(dom_uuid): 'start',
|
|
'/domains/{}/node'.format(dom_uuid): self.name,
|
|
'/domains/{}/lastnode'.format(dom_uuid): '',
|
|
'/domains/{}/node_autostart'.format(dom_uuid): 'False'
|
|
})
|
|
continue
|
|
|
|
try:
|
|
last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
|
|
except:
|
|
continue
|
|
|
|
if last_node != self.name:
|
|
continue
|
|
|
|
self.logger.out('Setting unmigration for VM "{}"'.format(dom_uuid), state='i')
|
|
zkhandler.writedata(self.zk_conn, {
|
|
'/domains/{}/state'.format(dom_uuid): 'migrate',
|
|
'/domains/{}/node'.format(dom_uuid): self.name,
|
|
'/domains/{}/lastnode'.format(dom_uuid): ''
|
|
})
|
|
|
|
# Wait for the VM to migrate back
|
|
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(dom_uuid)) != 'start':
|
|
time.sleep(1)
|
|
|
|
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' })
|
|
self.flush_thread = None
|
|
self.flush_stopper = False
|