From 610f6e8f2c3adc04fd77c070315932a5f142875e Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 9 Jun 2021 21:17:09 -0400 Subject: [PATCH] Convert CephInstance to new ZK schema handler --- node-daemon/pvcnoded/CephInstance.py | 34 ++++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/node-daemon/pvcnoded/CephInstance.py b/node-daemon/pvcnoded/CephInstance.py index ac9aefb0..2996e317 100644 --- a/node-daemon/pvcnoded/CephInstance.py +++ b/node-daemon/pvcnoded/CephInstance.py @@ -35,7 +35,7 @@ class CephOSDInstance(object): self.size = None self.stats = dict() - @self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id)) def watch_osd_node(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -50,7 +50,7 @@ class CephOSDInstance(object): if data and data != self.node: self.node = data - @self.zkhandler.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id)) def watch_osd_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -174,10 +174,10 @@ def add_osd(zkhandler, logger, node, device, weight): # 7. Add the new OSD to the list logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i') zkhandler.write([ - ('/ceph/osds/{}'.format(osd_id), ''), - ('/ceph/osds/{}/node'.format(osd_id), node), - ('/ceph/osds/{}/device'.format(osd_id), device), - ('/ceph/osds/{}/stats'.format(osd_id), '{}') + (('osd', osd_id), ''), + (('osd.node', osd_id), node), + (('osd.device', osd_id), device), + (('osd.stats', osd_id), '{}'), ]) # Log it @@ -272,7 +272,7 @@ def remove_osd(zkhandler, logger, osd_id, osd_obj): # 7. Delete OSD from ZK logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i') - zkhandler.delete('/ceph/osds/{}'.format(osd_id), recursive=True) + zkhandler.delete(('osd', osd_id), recursive=True) # Log it logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o') @@ -291,7 +291,7 @@ class CephPoolInstance(object): self.pgs = '' self.stats = dict() - @self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name)) def watch_pool_node(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -306,7 +306,7 @@ class CephPoolInstance(object): if data and data != self.pgs: self.pgs = data - @self.zkhandler.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name)) def watch_pool_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -330,7 +330,7 @@ class CephVolumeInstance(object): self.name = name self.stats = dict() - @self.zkhandler.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}')) def watch_volume_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -355,7 +355,7 @@ class CephSnapshotInstance(object): self.name = name self.stats = dict() - @self.zkhandler.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapsho.stats', f'{self.pool}/{self.volume}/{self.name}')) def watch_snapshot_stats(data, stat, event=''): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -382,7 +382,7 @@ def run_command(zkhandler, logger, this_node, data, d_osd): node, device, weight = args.split(',') if node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock('/cmd/ceph') + zk_lock = zkhandler.writelock('base.cmd.ceph') with zk_lock: # Add the OSD result = add_osd(zkhandler, logger, node, device, weight) @@ -390,13 +390,13 @@ def run_command(zkhandler, logger, this_node, data, d_osd): if result: # Update the command queue zkhandler.write([ - ('/cmd/ceph', 'success-{}'.format(data)) + ('base.cmd.ceph', 'success-{}'.format(data)) ]) # Command failed else: # Update the command queue zkhandler.write([ - ('/cmd/ceph', 'failure-{}'.format(data)) + ('base.cmd.ceph', 'failure-{}'.format(data)) ]) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -408,7 +408,7 @@ def run_command(zkhandler, logger, this_node, data, d_osd): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock('/cmd/ceph') + zk_lock = zkhandler.writelock('base.cmd.ceph') with zk_lock: # Remove the OSD result = remove_osd(zkhandler, logger, osd_id, d_osd[osd_id]) @@ -416,13 +416,13 @@ def run_command(zkhandler, logger, this_node, data, d_osd): if result: # Update the command queue zkhandler.write([ - ('/cmd/ceph', 'success-{}'.format(data)) + ('base.cmd.ceph', 'success-{}'.format(data)) ]) # Command failed else: # Update the command queue zkhandler.write([ - ('/cmd/ceph', 'failure-{}'.format(data)) + ('base.cmd.ceph', 'failure-{}'.format(data)) ]) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1)