diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py index bcf81a29..675ee792 100644 --- a/node-daemon/pvcd/CephInstance.py +++ b/node-daemon/pvcd/CephInstance.py @@ -417,3 +417,176 @@ def remove_pool(zk_conn, logger, name): logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e') return False +def run_command(zk_conn, command): + # Get the command and args + command, args = data.split() + + # Adding a new OSD + if command == 'osd_add': + node, device, weight = args.split(',') + if node == this_node.name: + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Add the OSD + result = add_osd(zk_conn, logger, node, device, weight) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Removing an OSD + elif command == 'osd_remove': + osd_id = args + + # Verify osd_id is in the list + if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Remove the OSD + result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Online an OSD + elif command == 'osd_in': + osd_id = args + + # Verify osd_id is in the list + if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Online the OSD + result = in_osd(zk_conn, logger, osd_id) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Offline an OSD + elif command == 'osd_out': + osd_id = args + + # Verify osd_id is in the list + if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Offline the OSD + result = out_osd(zk_conn, logger, osd_id) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Set a property + elif command == 'osd_set': + option = args + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Set the property + result = set_property(zk_conn, logger, option) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Unset a property + elif command == 'osd_unset': + option = args + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Unset the property + result = unset_property(zk_conn, logger, option) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Adding a new pool + if command == 'pool_add': + name, pgs = args.split(',') + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Add the pool + result = add_pool(zk_conn, logger, name, pgs) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Removing a pool + elif command == 'pool_remove': + name = args + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Remove the pool + result = remove_pool(zk_conn, logger, name) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index 463eaf45..48d9c600 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -674,171 +674,7 @@ def cmd(data, stat, event=''): data = '' if data: - # Get the command and args - command, args = data.split() - - # Adding a new OSD - if command == 'osd_add': - node, device, weight = args.split(',') - if node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Add the OSD - result = CephInstance.add_osd(zk_conn, logger, node, device, weight) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - # Removing an OSD - elif command == 'osd_remove': - osd_id = args - - # Verify osd_id is in the list - if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Remove the OSD - result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - # Online an OSD - elif command == 'osd_in': - osd_id = args - - # Verify osd_id is in the list - if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Online the OSD - result = CephInstance.in_osd(zk_conn, logger, osd_id) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - # Offline an OSD - elif command == 'osd_out': - osd_id = args - - # Verify osd_id is in the list - if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Offline the OSD - result = CephInstance.out_osd(zk_conn, logger, osd_id) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - # Set a property - elif command == 'osd_set': - option = args - - if this_node.router_state == 'primary': - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Set the property - result = CephInstance.set_property(zk_conn, logger, option) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - # Unset a property - elif command == 'osd_unset': - option = args - - if this_node.router_state == 'primary': - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Unset the property - result = CephInstance.unset_property(zk_conn, logger, option) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - - # Adding a new pool - if command == 'pool_add': - name, pgs = args.split(',') - if this_node.router_state == 'primary': - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Add the pool - result = CephInstance.add_pool(zk_conn, logger, name, pgs) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) - # Removing a pool - elif command == 'pool_remove': - name = args - - if this_node.router_state == 'primary': - # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with zk_lock: - # Remove the pool - result = CephInstance.remove_pool(zk_conn, logger, name) - # Command succeeded - if result: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) - # Command failed - else: - # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) - # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock - time.sleep(0.5) + CephInstance.run_command(data) # OSD objects @zk_conn.ChildrenWatch('/ceph/osds')