diff --git a/node-daemon/pvcnoded/CephInstance.py b/node-daemon/pvcnoded/CephInstance.py index a4893ac3..daff4c6b 100644 --- a/node-daemon/pvcnoded/CephInstance.py +++ b/node-daemon/pvcnoded/CephInstance.py @@ -23,20 +23,19 @@ import time import json import psutil -import pvcnoded.zkhandler as zkhandler import pvcnoded.common as common class CephOSDInstance(object): - def __init__(self, zk_conn, this_node, osd_id): - self.zk_conn = zk_conn + def __init__(self, zkhandler, this_node, osd_id): + self.zkhandler = zkhandler self.this_node = this_node self.osd_id = osd_id self.node = None self.size = None self.stats = dict() - @self.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id)) + @self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id)) def watch_osd_node(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -51,7 +50,7 @@ class CephOSDInstance(object): if data and data != self.node: self.node = data - @self.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id)) + @self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id)) def watch_osd_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -67,7 +66,7 @@ class CephOSDInstance(object): self.stats = json.loads(data) -def add_osd(zk_conn, logger, node, device, weight): +def add_osd(zkhandler, logger, node, device, weight): # We are ready to create a new OSD on this node logger.out('Creating new OSD disk on block device {}'.format(device), state='i') try: @@ -174,12 +173,12 @@ def add_osd(zk_conn, logger, node, device, weight): # 7. Add the new OSD to the list logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i') - zkhandler.writedata(zk_conn, { - '/ceph/osds/{}'.format(osd_id): '', - '/ceph/osds/{}/node'.format(osd_id): node, - '/ceph/osds/{}/device'.format(osd_id): device, - '/ceph/osds/{}/stats'.format(osd_id): '{}' - }) + zkhandler.write([ + ('/ceph/osds/{}'.format(osd_id), ''), + ('/ceph/osds/{}/node'.format(osd_id), node), + ('/ceph/osds/{}/device'.format(osd_id), device), + ('/ceph/osds/{}/stats'.format(osd_id), '{}') + ]) # Log it logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o') @@ -190,7 +189,7 @@ def add_osd(zk_conn, logger, node, device, weight): return False -def remove_osd(zk_conn, logger, osd_id, osd_obj): +def remove_osd(zkhandler, logger, osd_id, osd_obj): logger.out('Removing OSD disk {}'.format(osd_id), state='i') try: # 1. Verify the OSD is present @@ -273,7 +272,7 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj): # 7. Delete OSD from ZK logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i') - zkhandler.deletekey(zk_conn, '/ceph/osds/{}'.format(osd_id)) + zkhandler.delete('/ceph/osds/{}'.format(osd_id), recursive=True) # Log it logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o') @@ -285,14 +284,14 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj): class CephPoolInstance(object): - def __init__(self, zk_conn, this_node, name): - self.zk_conn = zk_conn + def __init__(self, zkhandler, this_node, name): + self.zkhandler = zkhandler self.this_node = this_node self.name = name self.pgs = '' self.stats = dict() - @self.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name)) def watch_pool_node(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -307,7 +306,7 @@ class CephPoolInstance(object): if data and data != self.pgs: self.pgs = data - @self.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name)) def watch_pool_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -324,14 +323,14 @@ class CephPoolInstance(object): class CephVolumeInstance(object): - def __init__(self, zk_conn, this_node, pool, name): - self.zk_conn = zk_conn + def __init__(self, zkhandler, this_node, pool, name): + self.zkhandler = zkhandler self.this_node = this_node self.pool = pool self.name = name self.stats = dict() - @self.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name)) + @self.zkhandler.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name)) def watch_volume_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -348,15 +347,15 @@ class CephVolumeInstance(object): class CephSnapshotInstance(object): - def __init__(self, zk_conn, this_node, pool, volume, name): - self.zk_conn = zk_conn + def __init__(self, zkhandler, this_node, pool, volume, name): + self.zkhandler = zkhandler self.this_node = this_node self.pool = pool self.volume = volume self.name = name self.stats = dict() - @self.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name)) + @self.zkhandler.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name)) def watch_snapshot_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -374,7 +373,7 @@ class CephSnapshotInstance(object): # Primary command function # This command pipe is only used for OSD adds and removes -def run_command(zk_conn, logger, this_node, data, d_osd): +def run_command(zkhandler, logger, this_node, data, d_osd): # Get the command and args command, args = data.split() @@ -383,18 +382,22 @@ def run_command(zk_conn, logger, this_node, data, d_osd): node, device, weight = args.split(',') if node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') + zk_lock = zkhandler.writelock('/cmd/ceph') with zk_lock: # Add the OSD - result = add_osd(zk_conn, logger, node, device, weight) + result = add_osd(zkhandler, logger, node, device, weight) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) + zkhandler.write([ + ('/cmd/ceph', 'success-{}'.format(data)) + ]) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) + zkhandler.write([ + ('/cmd/ceph', 'failure-{}'.format(data)) + ]) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -405,17 +408,21 @@ def run_command(zk_conn, logger, this_node, data, d_osd): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') + zk_lock = zkhandler.writelock('/cmd/ceph') with zk_lock: # Remove the OSD - result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) + result = remove_osd(zkhandler, logger, osd_id, d_osd[osd_id]) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) + zkhandler.write([ + ('/cmd/ceph', 'success-{}'.format(data)) + ]) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) + zkhandler.write([ + ('/cmd/ceph', 'failure-{}'.format(data)) + ]) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 36af6c0c..e0c0eabc 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -971,7 +971,7 @@ if enable_storage: # Add any missing OSDs to the list for osd in new_osd_list: if osd not in osd_list: - d_osd[osd] = CephInstance.CephOSDInstance(zkhandler.zk_conn, this_node, osd) + d_osd[osd] = CephInstance.CephOSDInstance(zkhandler, this_node, osd) # Remove any deleted OSDs from the list for osd in osd_list: @@ -991,7 +991,7 @@ if enable_storage: # Add any missing Pools to the list for pool in new_pool_list: if pool not in pool_list: - d_pool[pool] = CephInstance.CephPoolInstance(zkhandler.zk_conn, this_node, pool) + d_pool[pool] = CephInstance.CephPoolInstance(zkhandler, this_node, pool) d_volume[pool] = dict() volume_list[pool] = [] @@ -1014,7 +1014,7 @@ if enable_storage: # Add any missing Volumes to the list for volume in new_volume_list: if volume not in volume_list[pool]: - d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler.zk_conn, this_node, pool, volume) + d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler, this_node, pool, volume) # Remove any deleted Volumes from the list for volume in volume_list[pool]: