diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 460c2eb3..43d15035 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -22,7 +22,7 @@ import time import uuid from functools import wraps -from kazoo.client import KazooClient +from kazoo.client import KazooClient, KazooState # @@ -79,7 +79,7 @@ class ZKConnectionException(Exception): # Handler class # class ZKHandler(object): - def __init__(self, config): + def __init__(self, config, logger=None): """ Initialize an instance of the ZKHandler class with config @@ -87,6 +87,7 @@ class ZKHandler(object): """ self.encoding = 'utf8' self.coordinators = config['coordinators'] + self.logger = logger self.zk_conn = KazooClient(hosts=self.coordinators) # @@ -95,15 +96,43 @@ class ZKHandler(object): def coordinators(self): return str(self.coordinators) + def log(self, message, state=''): + if self.logger is not None: + self.logger.out(message, state) + else: + print(message) + # # State/connection management # - def connect(self): + def listener(self, state): + if state == KazooState.CONNECTED: + self.log('Connection to Zookeeper started', state='o') + else: + self.log('Connection to Zookeeper lost', state='w') + + while True: + time.sleep(0.5) + + _zk_conn = KazooClient(hosts=self.coordinators) + try: + _zk_conn.start() + except Exception: + del _zk_conn + continue + + self.zk_conn = _zk_conn + self.zk_conn.add_listener(self.listener) + break + + def connect(self, persistent=False): """ Start the zk_conn object and connect to the cluster """ try: self.zk_conn.start() + if persistent: + self.zk_conn.add_listener(self.listener) except Exception as e: raise ZKConnectionException(self, e) @@ -140,14 +169,14 @@ class ZKHandler(object): Create or update one or more keys' data """ if type(kvpairs) is not list: - print("ZKHandler error: Key-value sequence is not a list") + self.log("ZKHandler error: Key-value sequence is not a list", state='e') return False transaction = self.zk_conn.transaction() for kvpair in (kvpairs): if type(kvpair) is not tuple: - print("ZKHandler error: Key-value pair '{}' is not a tuple".format(kvpair)) + self.log("ZKHandler error: Key-value pair '{}' is not a tuple".format(kvpair), state='e') return False key = kvpair[0] @@ -172,14 +201,14 @@ class ZKHandler(object): try: transaction.check(key, new_version) except TypeError: - print("ZKHandler error: Key '{}' does not match expected version".format(key)) + self.log("ZKHandler error: Key '{}' does not match expected version".format(key), state='e') return False try: transaction.commit() return True except Exception as e: - print("ZKHandler error: Failed to commit transaction: {}".format(e)) + self.log("ZKHandler error: Failed to commit transaction: {}".format(e), state='e') return False def delete(self, key, recursive=True): @@ -202,7 +231,7 @@ class ZKHandler(object): Rename one or more keys to a new value """ if type(kkpairs) is not list: - print("ZKHandler error: Key-key sequence is not a list") + self.log("ZKHandler error: Key-key sequence is not a list", state='e') return False transaction = self.zk_conn.transaction() @@ -221,17 +250,17 @@ class ZKHandler(object): for kkpair in (kkpairs): if type(kkpair) is not tuple: - print("ZKHandler error: Key-key pair '{}' is not a tuple".format(kkpair)) + self.log("ZKHandler error: Key-key pair '{}' is not a tuple".format(kkpair), state='e') return False source_key = kkpair[0] destination_key = kkpair[1] if not self.exists(source_key): - print("ZKHander error: Source key '{}' does not exist".format(source_key)) + self.log("ZKHander error: Source key '{}' does not exist".format(source_key), state='e') return False if self.exists(destination_key): - print("ZKHander error: Destination key '{}' already exists".format(destination_key)) + self.log("ZKHander error: Destination key '{}' already exists".format(destination_key), state='e') return False rename_element(transaction, source_key, destination_key) @@ -240,7 +269,7 @@ class ZKHandler(object): transaction.commit() return True except Exception as e: - print("ZKHandler error: Failed to commit transaction: {}".format(e)) + self.log("ZKHandler error: Failed to commit transaction: {}".format(e), state='e') return False # @@ -260,7 +289,7 @@ class ZKHandler(object): break except Exception as e: if count > 5: - print("ZKHandler warning: Failed to acquire read lock after 5 tries: {}".format(e)) + self.log("ZKHandler warning: Failed to acquire read lock after 5 tries: {}".format(e), state='e') break else: time.sleep(0.5) @@ -283,7 +312,7 @@ class ZKHandler(object): break except Exception as e: if count > 5: - print("ZKHandler warning: Failed to acquire write lock after 5 tries: {}".format(e)) + self.log("ZKHandler warning: Failed to acquire write lock after 5 tries: {}".format(e), state='e') break else: time.sleep(0.5) @@ -306,7 +335,7 @@ class ZKHandler(object): break except Exception as e: if count > 5: - print("ZKHandler warning: Failed to acquire exclusive lock after 5 tries: {}".format(e)) + self.log("ZKHandler warning: Failed to acquire exclusive lock after 5 tries: {}".format(e), state='e') break else: time.sleep(0.5)