Files
pvc/node-daemon/pvcnoded/objects/VMInstance.py
Joshua M. Boniface 0d72798814 Work around synchronization lock issues
Make the block on stage C only wait for 900 seconds (15 minutes) to
prevent indefinite blocking.

The issue comes if a VM is being received, and the current unflush is
cancelled for a flush. When this happens, this lock acquisition seems to
block for no obvious reason, and no other changes seem to affect it.
This is certainly some sort of locking bug within Kazoo but I can't
diagnose it as-is. Leave a TODO to look into this again in the future.
2021-09-26 23:26:21 -04:00

864 lines
35 KiB
Python

#!/usr/bin/env python3
# VMInstance.py - Class implementing a PVC virtual machine in pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import uuid
import time
import libvirt
import json
from threading import Thread
from xml.etree import ElementTree
from re import match
import daemon_lib.common as common
import pvcnoded.objects.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import daemon_lib.common as daemon_common
class VMInstance(object):
# Initialization function
def __init__(self, domuuid, zkhandler, config, logger, this_node):
# Passed-in variables on creation
self.domuuid = domuuid
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
# Get data from zookeeper
self.domname = self.zkhandler.read(('domain', domuuid))
self.state = self.zkhandler.read(('domain.state', domuuid))
self.node = self.zkhandler.read(('domain.node', domuuid))
self.lastnode = self.zkhandler.read(('domain.last_node', domuuid))
self.last_currentnode = self.zkhandler.read(('domain.node', domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', domuuid))
try:
self.migration_method = self.zkhandler.read(('domain.meta.migrate_method', self.domuuid))
except Exception:
self.migration_method = 'none'
# These will all be set later
self.instart = False
self.inrestart = False
self.inmigrate = False
self.inreceive = False
self.inshutdown = False
self.instop = False
# State thread
self.state_thread = None
# Libvirt domuuid
self.dom = self.lookupByUUID(self.domuuid)
# Log watcher instance
self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zkhandler, self.config, self.logger, self.this_node)
# Watch for changes to the state field in Zookeeper
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('domain.state', self.domuuid))
def watch_state(data, stat, event=""):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
# Perform a management command
self.logger.out('Updating state of VM {}'.format(self.domuuid), state='i')
self.state_thread = Thread(target=self.manage_vm_state, args=(), kwargs={})
self.state_thread.start()
# Get data functions
def getstate(self):
return self.state
def getnode(self):
return self.node
def getlastnode(self):
return self.lastnode
def getdom(self):
return self.dom
def getmemory(self):
try:
if self.dom is not None:
memory = int(self.dom.info()[2] / 1024)
else:
domain_information = daemon_common.getInformationFromXML(self.zkhandler, self.domuuid)
memory = int(domain_information['memory'])
except Exception:
memory = 0
return memory
def getvcpus(self):
try:
vcpus = int(self.dom.info()[3])
except Exception:
vcpus = 0
return vcpus
# Manage local node domain_list
def addDomainToList(self):
if self.domuuid not in self.this_node.domain_list:
try:
# Add the domain to the domain_list array
self.this_node.domain_list.append(self.domuuid)
# Push the change up to Zookeeper
self.zkhandler.write([
(('node.running_domains', self.this_node.name), ' '.join(self.this_node.domain_list))
])
except Exception as e:
self.logger.out('Error adding domain to list: {}'.format(e), state='e')
def removeDomainFromList(self):
if self.domuuid in self.this_node.domain_list:
try:
# Remove the domain from the domain_list array
self.this_node.domain_list.remove(self.domuuid)
# Push the change up to Zookeeper
self.zkhandler.write([
(('node.running_domains', self.this_node.name), ' '.join(self.this_node.domain_list))
])
except Exception as e:
self.logger.out('Error removing domain from list: {}'.format(e), state='e')
# Update the VNC live data
def update_vnc(self):
if self.dom is not None:
live_xml = ElementTree.fromstring(self.dom.XMLDesc(0))
graphics = live_xml.find('./devices/graphics')
if graphics is not None:
self.logger.out('Updating VNC data', state='i', prefix='Domain {}'.format(self.domuuid))
port = graphics.get('port', '')
listen = graphics.get('listen', '')
self.zkhandler.write([
(('domain.console.vnc', self.domuuid), '{}:{}'.format(listen, port))
])
else:
self.zkhandler.write([
(('domain.console.vnc', self.domuuid), '')
])
else:
self.zkhandler.write([
(('domain.console.vnc', self.domuuid), '')
])
# Attach a device to the running domain
def attach_device(self, xml_spec):
if not self.dom:
self.logger.out('Cannot attach device to non-running domain', state='w', prefix='Domain {}'.format(self.domuuid))
return False
try:
self.logger.out('Attaching new device to VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.dom.attachDevice(xml_spec)
return True
except Exception as e:
self.logger.out('Failed to attach device: {}'.format(e), state='e', prefix='Domain {}'.format(self.domuuid))
return False
# Detach a device from the running domain
def detach_device(self, xml_spec):
if not self.dom:
self.logger.out('Cannot detach device from non-running domain', state='w', prefix='Domain {}'.format(self.domuuid))
return False
try:
self.logger.out('Detaching device from VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.dom.detachDevice(xml_spec)
return True
except Exception as e:
self.logger.out('Failed to detach device: {}'.format(e), state='e', prefix='Domain {}'.format(self.domuuid))
return False
# Start up the VM
def start_vm(self):
# Start the log watcher
self.console_log_instance.start()
self.logger.out('Starting VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
self.instart = False
return
# Try to get the current state in case it's already running
try:
self.dom = self.lookupByUUID(self.domuuid)
curstate = self.dom.state()[0]
except Exception:
curstate = 'notstart'
# Handle situations where the VM crashed or the node unexpectedly rebooted
if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
# Flush locks
self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid))
VMInstance.flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node)
if self.zkhandler.read(('domain.state', self.domuuid)) == 'fail':
lv_conn.close()
self.dom = None
self.instart = False
return
if curstate == libvirt.VIR_DOMAIN_RUNNING:
# If it is running just update the model
self.addDomainToList()
self.zkhandler.write([
(('domain.failed_reason', self.domuuid), '')
])
else:
# Or try to create it
try:
# Grab the domain information from Zookeeper
xmlconfig = self.zkhandler.read(('domain.xml', self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList()
self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = dom
self.zkhandler.write([
(('domain.failed_reason', self.domuuid), '')
])
except libvirt.libvirtError as e:
self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.zkhandler.write([
(('domain.state', self.domuuid), 'fail'),
(('domain.failed_reason', self.domuuid), str(e))
])
lv_conn.close()
self.dom = None
self.instart = False
return
lv_conn.close()
self.instart = False
# Restart the VM
def restart_vm(self):
self.logger.out('Restarting VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.inrestart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
self.inrestart = False
return
self.shutdown_vm()
time.sleep(0.2)
self.start_vm()
self.addDomainToList()
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
lv_conn.close()
self.inrestart = False
# Stop the VM forcibly without updating state
def terminate_vm(self):
self.logger.out('Terminating VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
time.sleep(0.2)
try:
if self.getdom().state()[0] == libvirt.VIR_DOMAIN_RUNNING:
# It didn't terminate, try again
self.dom.destroy()
except libvirt.libvirtError:
pass
except AttributeError:
self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.removeDomainFromList()
self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Stop the VM forcibly
def stop_vm(self):
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
time.sleep(0.2)
try:
if self.getdom().state()[0] == libvirt.VIR_DOMAIN_RUNNING:
# It didn't terminate, try again
self.dom.destroy()
except libvirt.libvirtError:
pass
except AttributeError:
self.logger.out('Failed to stop VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.removeDomainFromList()
if self.inrestart is False:
self.zkhandler.write([
(('domain.state', self.domuuid), 'stop')
])
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
self.instop = False
# Stop the log watcher
self.console_log_instance.stop()
# Shutdown the VM gracefully
def shutdown_vm(self):
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = False
self.inshutdown = True
self.dom.shutdown()
tick = 0
while True:
tick += 1
time.sleep(1)
# Abort shutdown if the state changes to start
current_state = self.zkhandler.read(('domain.state', self.domuuid))
if current_state not in ['shutdown', 'restart', 'migrate']:
self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = True
break
try:
lvdomstate = self.dom.state()[0]
except Exception:
lvdomstate = None
if lvdomstate != libvirt.VIR_DOMAIN_RUNNING:
self.removeDomainFromList()
self.zkhandler.write([
(('domain.state', self.domuuid), 'stop')
])
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
# Stop the log watcher
self.console_log_instance.stop()
break
if tick >= self.config['vm_shutdown_timeout']:
self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid))
self.zkhandler.write([
(('domain.state', self.domuuid), 'stop')
])
break
self.inshutdown = False
if is_aborted:
self.manage_vm_state()
if self.inrestart:
# Wait to prevent race conditions
time.sleep(1)
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
# Migrate the VM to a target host
def migrate_vm(self, force_live=False, force_shutdown=False):
# Wait for any previous migration
while self.inmigrate:
time.sleep(0.1)
if self.migration_method == 'live':
force_live = True
elif self.migration_method == 'shutdown':
force_shutdown = True
self.inmigrate = True
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
# Used for sanity checking later
target_node = self.zkhandler.read(('domain.node', self.domuuid))
aborted = False
def abort_migrate(reason):
self.zkhandler.write([
(('domain.state', self.domuuid), 'start'),
(('domain.node', self.domuuid), self.this_node.name),
(('domain.last_node', self.domuuid), self.last_lastnode)
])
migrate_lock_node.release()
migrate_lock_state.release()
self.inmigrate = False
self.logger.out('Aborted migration: {}'.format(reason), state='i', prefix='Domain {}'.format(self.domuuid))
# Acquire exclusive lock on the domain node key
migrate_lock_node = self.zkhandler.exclusivelock(('domain.node', self.domuuid))
migrate_lock_state = self.zkhandler.exclusivelock(('domain.state', self.domuuid))
migrate_lock_node.acquire()
migrate_lock_state.acquire()
# Don't try to migrate a node to itself, set back to start
if self.node == self.lastnode or self.node == self.this_node.name:
abort_migrate('Target node matches the current active node during initial check')
return
time.sleep(0.5) # Initial delay for the first writer to grab the lock
def migrate_live():
self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid))
# Set up destination connection
dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain'])
dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain'])
try:
self.logger.out('Opening remote libvirt connection', state='i', prefix='Domain {}'.format(self.domuuid))
# Open a connection to the destination
dest_lv_conn = libvirt.open(dest_lv)
if not dest_lv_conn:
raise
except Exception:
self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}'.format(self.domuuid))
return False
try:
self.logger.out('Live migrating VM', state='i', prefix='Domain {}'.format(self.domuuid))
# Send the live migration; force the destination URI to ensure we transit over the cluster network
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0)
if not target_dom:
raise
except Exception as e:
self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}'.format(self.domuuid))
dest_lv_conn.close()
return False
self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
dest_lv_conn.close()
self.console_log_instance.stop()
self.removeDomainFromList()
return True
def migrate_shutdown():
self.logger.out('Shutting down VM for offline migration', state='i', prefix='Domain {}'.format(self.domuuid))
self.shutdown_vm()
return True
self.logger.out('Acquiring lock for phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
try:
lock.acquire(timeout=30.0)
except Exception:
abort_migrate('Timed out waiting for peer')
return
migrate_live_result = False
# Do a final verification
if self.node == self.lastnode or self.node == self.this_node.name:
abort_migrate('Target node matches the current active node during final check')
return
if self.node != target_node:
abort_migrate('Target node changed during preparation')
return
if not force_shutdown:
# A live migrate is attemped 3 times in succession
ticks = 0
while True:
ticks += 1
self.logger.out('Attempting live migration try {}'.format(ticks), state='i', prefix='Domain {}'.format(self.domuuid))
migrate_live_result = migrate_live()
if migrate_live_result:
break
time.sleep(0.5)
if ticks > 2:
break
else:
migrate_live_result = False
if not migrate_live_result:
if force_live:
self.logger.out('Could not live migrate VM while live migration enforced', state='e', prefix='Domain {}'.format(self.domuuid))
aborted = True
else:
migrate_shutdown()
lock.release()
if aborted:
abort_migrate('Live migration failed and is required')
return
time.sleep(0.5)
self.logger.out('Acquiring lock for phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
migrate_lock_node.release()
migrate_lock_state.release()
lock.release()
self.inmigrate = False
return
# Receive the migration from another host
def receive_migrate(self):
# Wait for any previous migration
while self.inreceive:
time.sleep(0.1)
self.inreceive = True
self.logger.out('Receiving VM migration from node "{}"'.format(self.last_currentnode), state='i', prefix='Domain {}'.format(self.domuuid))
# Ensure our lock key is populated
self.zkhandler.write([
(('domain.migrate.sync_lock', self.domuuid), self.domuuid)
])
self.logger.out('Acquiring lock for phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
try:
lock.acquire(timeout=30.0)
except Exception:
self.logger.out('Failed to acquire exclusive lock for VM', state='w')
return
# Exactly twice the amount of time that the other side is waiting
time.sleep(1)
lock.release()
time.sleep(0.5)
self.logger.out('Acquiring lock for phase C', state='i', prefix='Domain {}'.format(self.domuuid))
try:
# Wait for only 900 seconds on this step since we don't do anything and it can fail
# if a flush or unflush is cancelled. 900 seconds should be plenty for real long
# migations while still avoiding an indefinite blocking here.
# TODO: Really dig into why
lock.acquire(timeout=900)
# This is strictly a synchronizng step
lock.release()
except Exception:
self.logger.out('Failed to acquire lock for phase C within 15 minutes, continuing', state='w', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.5)
self.logger.out('Acquiring lock for phase E', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
# Set the updated data
self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.state = self.zkhandler.read(('domain.state', self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
if self.dom:
lvdomstate = self.dom.state()[0]
if lvdomstate == libvirt.VIR_DOMAIN_RUNNING:
# VM has been received and started
self.addDomainToList()
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
else:
# The receive somehow failed
self.zkhandler.write([
(('domain.state', self.domuuid), 'fail'),
(('domain.failed_reason', self.domuuid), 'Failed to receive migration')
])
self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}'.format(self.domuuid))
else:
if self.node == self.this_node.name:
if self.state in ['start']:
# The receive was aborted
self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid))
elif self.state in ['stop']:
# The send was shutdown-based
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
else:
# The send failed or was aborted
self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.zkhandler.write([
(('domain.migrate.sync_lock', self.domuuid), '')
])
self.inreceive = False
return
#
# Main function to manage a VM (taking only self)
#
def manage_vm_state(self):
# Update the current values from zookeeper
self.state = self.zkhandler.read(('domain.state', self.domuuid))
self.node = self.zkhandler.read(('domain.node', self.domuuid))
self.lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
self.migration_method = self.zkhandler.read(('domain.meta.migrate_method', self.domuuid))
# Check the current state of the VM
try:
if self.dom is not None:
running, reason = self.dom.state()
else:
raise
except Exception:
running = libvirt.VIR_DOMAIN_NOSTATE
self.logger.out('VM state change for "{}": {} {}'.format(self.domuuid, self.state, self.node), state='i')
#######################
# Handle state changes
#######################
# Valid states are:
# start
# migrate
# migrate-live
# restart
# shutdown
# stop
# States we don't (need to) handle are:
# disable
# provision
# Conditional pass one - Are we already performing an action
if self.instart is False \
and self.inrestart is False \
and self.inmigrate is False \
and self.inreceive is False \
and self.inshutdown is False \
and self.instop is False:
# Conditional pass two - Is this VM configured to run on this node
if self.node == self.this_node.name:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM is already running and should be
if self.state == "start":
# Start the log watcher
self.console_log_instance.start()
# Add domain to running list
self.addDomainToList()
# VM is already running and should be but stuck in migrate state
elif self.state == "migrate" or self.state == "migrate-live":
# Start the log watcher
self.console_log_instance.start()
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
# Add domain to running list
self.addDomainToList()
# VM should be restarted
elif self.state == "restart":
self.restart_vm()
# VM should be shut down
elif self.state == "shutdown":
self.shutdown_vm()
# VM should be stopped
elif self.state == "stop":
self.stop_vm()
else:
# VM should be started
if self.state == "start":
# Start the domain
self.start_vm()
# VM should be migrated to this node
elif self.state == "migrate" or self.state == "migrate-live":
# Receive the migration
self.receive_migrate()
# VM should be restarted (i.e. started since it isn't running)
if self.state == "restart":
self.zkhandler.write([
(('domain.state', self.domuuid), 'start')
])
# VM should be shut down; ensure it's gone from this node's domain_list
elif self.state == "shutdown":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
# VM should be stoped; ensure it's gone from this node's domain_list
elif self.state == "stop":
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
# Update the VNC information
self.update_vnc()
else:
# Conditional pass three - Is this VM currently running on this node
if running == libvirt.VIR_DOMAIN_RUNNING:
# VM should be migrated away from this node
if self.state == "migrate":
self.migrate_vm(force_live=False)
# VM should be migrated away from this node, live only (no shutdown fallback)
elif self.state == "migrate-live":
self.migrate_vm(force_live=True)
# VM should be shutdown gracefully
elif self.state == 'shutdown':
self.shutdown_vm()
# VM should be forcibly terminated
else:
self.terminate_vm()
self.state_thread = None
return
# This function is a wrapper for libvirt.lookupByUUID which fixes some problems
# 1. Takes a text UUID and handles converting it to bytes
# 2. Try's it and returns a sensible value if not
def lookupByUUID(self, tuuid):
# Don't do anything if the VM shouldn't live on this node
if self.node != self.this_node.name:
return None
lv_conn = None
libvirt_name = "qemu:///system"
# Convert the text UUID to bytes
buuid = uuid.UUID(tuuid).bytes
# Try
try:
# Open a libvirt connection
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
return None
# Lookup the UUID
dom = lv_conn.lookupByUUID(buuid)
# Fail
except Exception:
dom = None
# After everything
finally:
# Close the libvirt connection
if lv_conn is not None:
lv_conn.close()
# Return the dom object (or None)
return dom
# Flush the locks of a VM based on UUID
@staticmethod
def flush_locks(zkhandler, logger, dom_uuid, this_node=None):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images
rbd_list = zkhandler.read(('domain.storage.volumes', dom_uuid)).split(',')
for rbd in rbd_list:
# Check if a lock exists
lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd))
if lock_list_retcode != 0:
logger.out('Failed to obtain lock list for volume "{}"'.format(rbd), state='e')
continue
try:
lock_list = json.loads(lock_list_stdout)
except Exception as e:
logger.out('Failed to parse lock list for volume "{}": {}'.format(rbd, e), state='e')
continue
# If there's at least one lock
if lock_list:
# Loop through the locks
for lock in lock_list:
if this_node is not None and zkhandler.read(('domain.state', dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr:
logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e'))
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)),
])
break
# Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker']))
if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e')
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)),
])
break
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o')
return True
# Primary command function
def vm_command(zkhandler, logger, this_node, data):
# Get the command and args
command, dom_uuid, *args = data.split()
if match('success-.*', command) or match('failure-.*', command):
return
logger.out('Getting command "{}" for domain "{}"'.format(command, dom_uuid), state='i')
# Verify that the VM is set to run on this node
domain = this_node.d_domain.get(dom_uuid, None)
if domain is None:
return False
if domain.getnode() != this_node.name:
return
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.domain')
with zk_lock:
# Flushing VM RBD locks
if command == 'flush_locks':
result = VMInstance.flush_locks(zkhandler, logger, dom_uuid, this_node)
# Attaching a device
elif command == 'attach_device':
xml_spec = ' '.join(args)
result = domain.attach_device(xml_spec)
# Detaching a device
elif command == 'detach_device':
xml_spec = ' '.join(args)
result = domain.detach_device(xml_spec)
# Command not defined
else:
result = False
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)