From fef230ad98cf9059d68ce6997c035f802089bd06 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 27 May 2021 22:48:48 -0400 Subject: [PATCH] Implement class-based version of zkhander --- daemon-common/zkhandler.py | 344 ++++++++++++++++++++++--------------- 1 file changed, 206 insertions(+), 138 deletions(-) diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index e3fedee2..b229d9b0 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -21,164 +21,232 @@ import time import uuid +from kazoo.client import KazooClient -# Exists function -def exists(zk_conn, key): - stat = zk_conn.exists(key) - if stat: - return True - else: - return False +class ZKHandler(object): + def __init__(self, hosts): + """ + Initialize an instance of the ZKHandler class with config + A zk_conn object will be created but not started + """ + self.encoding = 'utf8' + self.zk_conn = KazooClient(hosts=hosts) -# Child list function -def listchildren(zk_conn, key): - children = zk_conn.get_children(key) - return children + # + # State/connection management + # + def connect(self): + """ + Start the zk_conn object and connect to the cluster + """ + self.zk_conn.start() + def disconnect(self): + """ + Stop and close the zk_conn object and disconnect from the cluster -# Delete key function -def deletekey(zk_conn, key, recursive=True): - zk_conn.delete(key, recursive=recursive) + The class instance may be reused later (avoids persistent connections) + """ + self.zk_conn.stop() + self.zk_conn.close() - -# Rename key recursive function -def rename_key_element(zk_conn, zk_transaction, source_key, destination_key): - data_raw = zk_conn.get(source_key) - data = data_raw[0] - zk_transaction.create(destination_key, data) - - if zk_conn.get_children(source_key): - for child_key in zk_conn.get_children(source_key): - child_source_key = "{}/{}".format(source_key, child_key) - child_destination_key = "{}/{}".format(destination_key, child_key) - rename_key_element(zk_conn, zk_transaction, child_source_key, child_destination_key) - - zk_transaction.delete(source_key) - - -# Rename key function -def renamekey(zk_conn, kv): - # Start up a transaction - zk_transaction = zk_conn.transaction() - - # Proceed one KV pair at a time - for source_key in sorted(kv): - destination_key = kv[source_key] - - # Check if the source key exists or fail out - if not zk_conn.exists(source_key): - raise - # Check if the destination key exists and fail out - if zk_conn.exists(destination_key): - raise - - rename_key_element(zk_conn, zk_transaction, source_key, destination_key) - - # Commit the transaction - try: - zk_transaction.commit() - return True - except Exception: - return False - - -# Data read function -def readdata(zk_conn, key): - data_raw = zk_conn.get(key) - data = data_raw[0].decode('utf8') - return data - - -# Data write function -def writedata(zk_conn, kv): - # Start up a transaction - zk_transaction = zk_conn.transaction() - - # Proceed one KV pair at a time - for key in sorted(kv): - data = kv[key] - - # Check if this key already exists or not - if not zk_conn.exists(key): - # We're creating a new key - zk_transaction.create(key, str(data).encode('utf8')) + # + # Key Actions + # + def exists(self, key): + """ + Check if a key exists + """ + stat = self.zk_conn.exists(key) + if stat: + return True else: - # We're updating a key with version validation - orig_data = zk_conn.get(key) - version = orig_data[1].version + return False - # Set what we expect the new version to be - new_version = version + 1 + def read(self, key): + """ + Read data from a key + """ + return self.zk_conn.get(key)[0].decode(self.encoding) - # Update the data - zk_transaction.set_data(key, str(data).encode('utf8')) + def write(self, kvpairs): + """ + Create or update one or more keys' data + """ + if type(kvpairs) is not list: + print("ZKHandler error: Key-value sequence is not a list") + return False - # Set up the check - try: - zk_transaction.check(key, new_version) - except TypeError: - print('Zookeeper key "{}" does not match expected version'.format(key)) + transaction = self.zk_conn.transaction() + + for kvpair in (kvpairs): + if type(kvpair) is not tuple: + print("ZKHandler error: Key-value pair '{}' is not a tuple".format(kvpair)) return False - # Commit the transaction - try: - zk_transaction.commit() - return True - except Exception: - return False + key = kvpair[0] + value = kvpair[1] + if not self.exists(key): + # Creating a new key + transaction.create(key, str(value).encode(self.encoding)) -# Write lock function -def writelock(zk_conn, key): - count = 1 - while True: - try: - lock_id = str(uuid.uuid1()) - lock = zk_conn.WriteLock('{}'.format(key), lock_id) - break - except Exception: - count += 1 - if count > 5: - break else: - time.sleep(0.5) - continue - return lock + # Updating an existing key + data = self.zk_conn.get(key) + version = data[1].version + # Validate the expected version after the execution + new_version = version + 1 + + # Update the data + transaction.set_data(key, str(value).encode(self.encoding)) + + # Check the data + try: + transaction.check(key, new_version) + except TypeError: + print("ZKHandler error: Key '{}' does not match expected version".format(key)) + return False -# Read lock function -def readlock(zk_conn, key): - count = 1 - while True: try: - lock_id = str(uuid.uuid1()) - lock = zk_conn.ReadLock('{}'.format(key), lock_id) - break - except Exception: - count += 1 - if count > 5: - break - else: - time.sleep(0.5) - continue - return lock + transaction.commit() + return True + except Exception as e: + print("ZKHandler error: Failed to commit transaction: {}".format(e)) + return False + def delete(self, key, recursive=True): + """ + Delete a key (defaults to recursive) + """ + if self.zk_conn.delete(key, recursive=recursive): + return True + else: + return False + + def children(self, key): + """ + Lists all children of a key + """ + return self.zk_conn.get_children(key) + + def rename(self, kkpairs): + """ + Rename one or more keys to a new value + """ + if type(kkpairs) is not list: + print("ZKHandler error: Key-key sequence is not a list") + return False + + transaction = self.zk_conn.transaction() + + def rename_element(transaction, source_key, destnation_key): + data = self.zk_conn.get(source_key)[0] + transaction.create(destination_key, data) + + if self.children(source_key): + for child_key in self.children(source_key): + child_source_key = "{}/{}".format(source_key, child_key) + child_destination_key = "{}/{}".format(destination_key, child_key) + rename_element(transaction, child_source_key, child_destination_key) + + transaction.delete(source_key, recursive=True) + + for kkpair in (kkpairs): + if type(kkpair) is not tuple: + print("ZKHandler error: Key-key pair '{}' is not a tuple".format(kkpair)) + return False + + source_key = kkpair[0] + destination_key = kkpair[1] + + if not self.exists(source_key): + print("ZKHander error: Source key '{}' does not exist".format(source_key)) + return False + if self.exists(destination_key): + print("ZKHander error: Destination key '{}' already exists".format(destination_key)) + return False + + rename_element(transaction, source_key, destination_key) -# Exclusive lock function -def exclusivelock(zk_conn, key): - count = 1 - while True: try: - lock_id = str(uuid.uuid1()) - lock = zk_conn.Lock('{}'.format(key), lock_id) - break - except Exception: - count += 1 - if count > 5: + transaction.commit() + return True + except Exception as e: + print("ZKHandler error: Failed to commit transaction: {}".format(e)) + return False + + # + # Lock actions + # + def readlock(self, key): + """ + Acquires a read lock on a key + """ + count = 1 + lock = None + + while True: + try: + lock_id = str(uuid.uuid1()) + lock = self.zk_conn.ReadLock(key, lock_id) break - else: - time.sleep(0.5) - continue - return lock + except Exception as e: + if count > 5: + print("ZKHandler warning: Failed to acquire read lock after 5 tries: {}".format(e)) + break + else: + time.sleep(0.5) + count += 1 + continue + + return lock + + def writelock(self, key): + """ + Acquires a write lock on a key + """ + count = 1 + lock = None + + while True: + try: + lock_id = str(uuid.uuid1()) + lock = self.zk_conn.WriteLock(key, lock_id) + break + except Exception as e: + if count > 5: + print("ZKHandler warning: Failed to acquire write lock after 5 tries: {}".format(e)) + break + else: + time.sleep(0.5) + count += 1 + continue + + return lock + + def exclusivelock(self, key): + """ + Acquires an exclusive lock on a key + """ + count = 1 + lock = None + + while True: + try: + lock_id = str(uuid.uuid1()) + lock = self.zk_conn.Lock(key, lock_id) + break + except Exception as e: + if count > 5: + print("ZKHandler warning: Failed to acquire exclusive lock after 5 tries: {}".format(e)) + break + else: + time.sleep(0.5) + count += 1 + continue + + return lock