From 8f160abf902564593ce67cf86e73048659bb04c1 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 10 Jul 2019 01:07:56 -0400 Subject: [PATCH] Handle cancelling flushes when new ones run Store the flush_thread of a node as a class object. Before starting a new flush thread (either flush or unflush), stop the existing one if it exists to prevent further migrations, then start the new thread. Set the object to None on init and again once the task actually finishes. Remove the inflush flag as this is not required when using these threads and functionally does nothing any longer, but add the flush_stopper flag to trigger cancellation of the current job. --- node-daemon/pvcd/NodeInstance.py | 50 +++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 5136bd62..ffc7aa8b 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -78,8 +78,10 @@ class NodeInstance(object): self.upstream_dev = None self.upstream_ipaddr = None self.upstream_cidrnetmask = None + # Threads + self.flush_thread = None # Flags - self.inflush = False + self.flush_stopper = False # Zookeeper handlers for changed states @self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name)) @@ -135,14 +137,21 @@ class NodeInstance(object): # toggle state management of this node if self.name == self.this_node: - if self.domain_state == 'flush' and self.inflush == False: - # Do flushing in a thread so it doesn't block the migrates out - flush_thread = threading.Thread(target=self.flush, args=(), kwargs={}) - flush_thread.start() - if self.domain_state == 'unflush' and self.inflush == False: - # Do unflushing in a thread so it doesn't block the migrates in - flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={}) - flush_thread.start() + # Stop any existing flush jobs + if self.flush_thread: + self.flush_stopper = True + self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i') + while self.flush_stopper: + time.sleep(1) + self.flush_stopper = False + # Do flushing in a thread so it doesn't block the migrates out + if self.domain_state == 'flush': + self.flush_thread = threading.Thread(target=self.flush, args=(), kwargs={}) + self.flush_thread.start() + # Do unflushing in a thread so it doesn't block the migrates in + if self.domain_state == 'unflush': + self.flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={}) + self.flush_thread.start() @self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) def watch_node_memfree(data, stat, event=''): @@ -344,11 +353,17 @@ class NodeInstance(object): # Flush all VMs on the host def flush(self): # Begin flush - self.inflush = True self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i') self.logger.out('Domain list: {}'.format(', '.join(self.domain_list))) fixed_domain_list = self.domain_list.copy() for dom_uuid in fixed_domain_list: + # Allow us to cancel the operation + if self.flush_stopper: + self.logger.out('Aborting node flush'.format(self.name), state='i') + self.flush_thread = None + self.flush_stopper = False + return + self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i') target_node = common.findTargetHypervisor(self.zk_conn, 'mem', dom_uuid) @@ -382,13 +397,20 @@ class NodeInstance(object): zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' }) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' }) - self.inflush = False + self.flush_thread = None + self.flush_stopper = False def unflush(self): - self.inflush = True self.logger.out('Restoring node {} to active service.'.format(self.name), state='i') fixed_domain_list = self.d_domain.copy() for dom_uuid in fixed_domain_list: + # Allow us to cancel the operation + if self.flush_stopper: + self.logger.out('Aborting node unflush'.format(self.name), state='i') + self.flush_thread = None + self.flush_stopper = False + return + try: last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) except: @@ -409,5 +431,5 @@ class NodeInstance(object): time.sleep(1) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' }) - self.inflush = False - + self.flush_thread = None + self.flush_stopper = False