diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 5136bd62..ffc7aa8b 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -78,8 +78,10 @@ class NodeInstance(object): self.upstream_dev = None self.upstream_ipaddr = None self.upstream_cidrnetmask = None + # Threads + self.flush_thread = None # Flags - self.inflush = False + self.flush_stopper = False # Zookeeper handlers for changed states @self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name)) @@ -135,14 +137,21 @@ class NodeInstance(object): # toggle state management of this node if self.name == self.this_node: - if self.domain_state == 'flush' and self.inflush == False: - # Do flushing in a thread so it doesn't block the migrates out - flush_thread = threading.Thread(target=self.flush, args=(), kwargs={}) - flush_thread.start() - if self.domain_state == 'unflush' and self.inflush == False: - # Do unflushing in a thread so it doesn't block the migrates in - flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={}) - flush_thread.start() + # Stop any existing flush jobs + if self.flush_thread: + self.flush_stopper = True + self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i') + while self.flush_stopper: + time.sleep(1) + self.flush_stopper = False + # Do flushing in a thread so it doesn't block the migrates out + if self.domain_state == 'flush': + self.flush_thread = threading.Thread(target=self.flush, args=(), kwargs={}) + self.flush_thread.start() + # Do unflushing in a thread so it doesn't block the migrates in + if self.domain_state == 'unflush': + self.flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={}) + self.flush_thread.start() @self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) def watch_node_memfree(data, stat, event=''): @@ -344,11 +353,17 @@ class NodeInstance(object): # Flush all VMs on the host def flush(self): # Begin flush - self.inflush = True self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i') self.logger.out('Domain list: {}'.format(', '.join(self.domain_list))) fixed_domain_list = self.domain_list.copy() for dom_uuid in fixed_domain_list: + # Allow us to cancel the operation + if self.flush_stopper: + self.logger.out('Aborting node flush'.format(self.name), state='i') + self.flush_thread = None + self.flush_stopper = False + return + self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i') target_node = common.findTargetHypervisor(self.zk_conn, 'mem', dom_uuid) @@ -382,13 +397,20 @@ class NodeInstance(object): zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' }) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' }) - self.inflush = False + self.flush_thread = None + self.flush_stopper = False def unflush(self): - self.inflush = True self.logger.out('Restoring node {} to active service.'.format(self.name), state='i') fixed_domain_list = self.d_domain.copy() for dom_uuid in fixed_domain_list: + # Allow us to cancel the operation + if self.flush_stopper: + self.logger.out('Aborting node unflush'.format(self.name), state='i') + self.flush_thread = None + self.flush_stopper = False + return + try: last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) except: @@ -409,5 +431,5 @@ class NodeInstance(object): time.sleep(1) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' }) - self.inflush = False - + self.flush_thread = None + self.flush_stopper = False