Remove /ceph/cmd pipe for (most) Ceph commands

Addresses #80
This commit is contained in:
2020-02-08 23:35:30 -05:00
parent eeb8879f73
commit 7ace5b5056
2 changed files with 194 additions and 1042 deletions

View File

@ -283,86 +283,6 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
def in_osd(zk_conn, logger, osd_id):
# We are ready to create a new pool on this node
logger.out('Setting OSD {} in'.format(osd_id), state='i')
try:
# 1. Set it in
retcode, stdout, stderr = common.run_os_command('ceph osd in {}'.format(osd_id))
if retcode:
print('ceph osd in')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Set OSD {} in'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to set OSD {} in: {}'.format(osd_id, e), state='e')
return False
def out_osd(zk_conn, logger, osd_id):
# We are ready to create a new pool on this node
logger.out('Settoutg OSD {} out'.format(osd_id), state='i')
try:
# 1. Set it out
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
if retcode:
proutt('ceph osd out')
proutt(stdout)
proutt(stderr)
raise
# Log it
logger.out('Set OSD {} out'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to set OSD {} out: {}'.format(osd_id, e), state='e')
return False
def set_property(zk_conn, logger, option):
# We are ready to create a new pool on this node
logger.out('Setting OSD property {}'.format(option), state='i')
try:
# 1. Set it in
retcode, stdout, stderr = common.run_os_command('ceph osd set {}'.format(option))
if retcode:
prsett('ceph osd set')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Set OSD property {}'.format(option), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to set OSD property {}: {}'.format(option, e), state='e')
return False
def unset_property(zk_conn, logger, option):
# We are ready to create a new pool on this node
logger.out('Unsetting OSD property {}'.format(option), state='i')
try:
# 1. Set it in
retcode, stdout, stderr = common.run_os_command('ceph osd unset {}'.format(option))
if retcode:
prunsett('ceph osd unset')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Unset OSD property {}'.format(option), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to unset OSD property {}: {}'.format(option, e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zk_conn, this_node, name):
self.zk_conn = zk_conn
@ -401,86 +321,6 @@ class CephPoolInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
def add_pool(zk_conn, logger, name, pgs, copies, mincopies):
# We are ready to create a new pool on this node
logger.out('Creating new RBD pool {}'.format(name), state='i')
try:
# 1. Create the pool
retcode, stdout, stderr = common.run_os_command('ceph osd pool create {} {} replicated'.format(name, pgs))
if retcode:
print('ceph osd pool create')
print(stdout)
print(stderr)
raise
# 2. Set the size and min_size
retcode, stdout, stderr = common.run_os_command('ceph osd pool set {} size {}'.format(name, copies))
if retcode:
print('ceph osd pool set size')
print(stdout)
print(stderr)
raise
retcode, stdout, stderr = common.run_os_command('ceph osd pool set {} min_size {}'.format(name, mincopies))
if retcode:
print('ceph osd pool set min_size')
print(stdout)
print(stderr)
raise
# 3. Enable RBD application
retcode, stdout, stderr = common.run_os_command('ceph osd pool application enable {} rbd'.format(name))
if retcode:
print('ceph osd pool application enable')
print(stdout)
print(stderr)
raise
# 4. Add the new pool to ZK
zkhandler.writedata(zk_conn, {
'/ceph/pools/{}'.format(name): '',
'/ceph/pools/{}/pgs'.format(name): pgs,
'/ceph/pools/{}/stats'.format(name): '{}',
'/ceph/volumes/{}'.format(name): '',
'/ceph/snapshots/{}'.format(name): '',
})
# Log it
logger.out('Created new RBD pool {}'.format(name), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new RBD pool {}: {}'.format(name, e), state='e')
return False
def remove_pool(zk_conn, logger, name):
# We are ready to create a new pool on this node
logger.out('Removing RBD pool {}'.format(name), state='i')
try:
# Remove pool volumes first
for volume in zkhandler.listchildren(zk_conn, '/ceph/volumes/{}'.format(name)):
remove_volume(zk_conn, logger, name, volume)
# Remove the pool
retcode, stdout, stderr = common.run_os_command('ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it'.format(pool=name))
if retcode:
print('ceph osd pool rm')
print(stdout)
print(stderr)
raise
# Delete pool from ZK
zkhandler.deletekey(zk_conn, '/ceph/pools/{}'.format(name))
zkhandler.deletekey(zk_conn, '/ceph/volumes/{}'.format(name))
zkhandler.deletekey(zk_conn, '/ceph/snapshots/{}'.format(name))
# Log it
logger.out('Removed RBD pool {}'.format(name), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e')
return False
class CephVolumeInstance(object):
def __init__(self, zk_conn, this_node, pool, name):
self.zk_conn = zk_conn
@ -504,158 +344,6 @@ class CephVolumeInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
def add_volume(zk_conn, logger, pool, name, size):
# We are ready to create a new volume on this node
logger.out('Creating new RBD volume {} on pool {} of size {}'.format(name, pool, size), state='i')
try:
# Create the volume
retcode, stdout, stderr = common.run_os_command('rbd create --size {} --image-feature layering,exclusive-lock {}/{}'.format(size, pool, name))
if retcode:
print('rbd create')
print(stdout)
print(stderr)
raise
# Get volume stats
retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}'.format(pool, name))
volstats = stdout
# Add the new volume to ZK
zkhandler.writedata(zk_conn, {
'/ceph/volumes/{}/{}'.format(pool, name): '',
'/ceph/volumes/{}/{}/stats'.format(pool, name): volstats,
'/ceph/snapshots/{}/{}'.format(pool, name): '',
})
# Log it
logger.out('Created new RBD volume {} on pool {}'.format(name, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
return False
def clone_volume(zk_conn, logger, pool, name_orig, name_new, prefix):
if not prefix:
# Ensure that prefix is just an empty string if it isn't set
prefix = ''
logger.out('Cloning RBD volume {} to {}{} on pool {}'.format(name_orig, prefix, name_new, pool), state='i')
try:
# Clone the volume
retcode, stdout, stderr = common.run_os_command('rbd copy {}/{} {}/{}{}'.format(pool, name_orig, pool, prefix, name_new))
if retcode:
print('rbd copy')
print(stdout)
print(stderr)
raise
# Get volume stats
retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}{}'.format(pool, prefix, name_new))
volstats = stdout
# Add the new volume to ZK
zkhandler.writedata(zk_conn, {
'/ceph/volumes/{}/{}{}'.format(pool, prefix, name_new): '',
'/ceph/volumes/{}/{}{}/stats'.format(pool, prefix, name_new): volstats,
'/ceph/snapshots/{}/{}{}'.format(pool, prefix, name_new): '',
})
# Log it
logger.out('Cloned RBD volume {} to {}{} on pool {}'.format(name_orig, prefix, name_new, pool), stats='o')
return True
except Exception as e:
# Log it
logger.out('Failed to clone RBD volume {} to {}{} on pool {}: {}'.format(name_orig, prefix, name_new, pool, e), state='e')
return False
def resize_volume(zk_conn, logger, pool, name, size):
logger.out('Resizing RBD volume {} on pool {} to size {}'.format(name, pool, size), state='i')
try:
# Resize the volume
retcode, stdout, stderr = common.run_os_command('rbd resize --size {} {}/{}'.format(size, pool, name))
if retcode:
print('rbd resize')
print(stdout)
print(stderr)
raise
# Get volume stats
retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}'.format(pool, name))
volstats = stdout
# Update the volume to ZK
zkhandler.writedata(zk_conn, {
'/ceph/volumes/{}/{}'.format(pool, name): '',
'/ceph/volumes/{}/{}/stats'.format(pool, name): volstats,
'/ceph/snapshots/{}/{}'.format(pool, name): '',
})
# Log it
logger.out('Created new RBD volume {} on pool {}'.format(name, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to resize RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
return False
def rename_volume(zk_conn, logger, pool, name, new_name):
logger.out('Renaming RBD volume {} to {} on pool {}'.format(name, new_name, pool))
try:
# Rename the volume
retcode, stdout, stderr = common.run_os_command('rbd rename {}/{} {}'.format(pool, name, new_name))
if retcode:
print('rbd rename')
print(stdout)
print(stderr)
raise
# Rename the volume in ZK
zkhandler.renamekey(zk_conn, {
'/ceph/volumes/{}/{}'.format(pool, name): '/ceph/volumes/{}/{}'.format(pool, new_name),
'/ceph/snapshots/{}/{}'.format(pool, name): '/ceph/snapshots/{}/{}'.format(pool, new_name),
})
# Get volume stats
retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}'.format(pool, new_name))
volstats = stdout
# Update the volume stats in ZK
zkhandler.writedata(zk_conn, {
'/ceph/volumes/{}/{}/stats'.format(pool, new_name): volstats,
})
# Log it
logger.out('Renamed RBD volume {} to {} on pool {}'.format(name, new_name, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to rename RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
return False
def remove_volume(zk_conn, logger, pool, name):
# We are ready to create a new volume on this node
logger.out('Removing RBD volume {} from pool {}'.format(name, pool), state='i')
try:
# Remove the volume
retcode, stdout, stderr = common.run_os_command('rbd rm {}/{}'.format(pool, name))
if retcode:
print('ceph osd volume rm')
print(stdout)
print(stderr)
raise
# Delete volume from ZK
zkhandler.deletekey(zk_conn, '/ceph/volumes/{}/{}'.format(pool, name))
zkhandler.deletekey(zk_conn, '/ceph/snapshots/{}/{}'.format(pool, name))
# Log it
logger.out('Removed RBD volume {} from pool {}'.format(name, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to remove RBD volume {} from pool {}: {}'.format(name, pool, e), state='e')
return False
class CephSnapshotInstance(object):
def __init__(self, zk_conn, this_node, name):
self.zk_conn = zk_conn
@ -680,85 +368,8 @@ class CephSnapshotInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
def add_snapshot(zk_conn, logger, pool, volume, name):
# We are ready to create a new snapshot on this node
logger.out('Creating new RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='i')
try:
# 1. Create the snapshot
retcode, stdout, stderr = common.run_os_command('rbd snap create {}/{}@{}'.format(pool, volume, name))
if retcode:
print('rbd snap create')
print(stdout)
print(stderr)
raise
# 2. Add the new snapshot to ZK
zkhandler.writedata(zk_conn, {
'/ceph/snapshots/{}/{}/{}'.format(pool, volume, name): '',
'/ceph/snapshots/{}/{}/{}/stats'.format(pool, volume, name): '{}'
})
# Log it
logger.out('Created new RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new RBD snapshot {} of volume {} on pool {}: {}'.format(name, volume, pool, e), state='e')
return False
def rename_snapshot(zk_conn, logger, pool, volume, name, new_name):
logger.out('Renaming RBD volume snapshot {} to {} for volume {} on pool {}'.format(name, new_name, volume, pool))
try:
# Rename the volume
retcode, stdout, stderr = common.run_os_command('rbd snap rename {}/{}@{} {}'.format(pool, volume, name, new_name))
if retcode:
print('rbd snap rename')
print(stdout)
print(stderr)
raise
# Rename the snapshot in ZK
zkhandler.renamekey(zk_conn, {
'/ceph/snapshots/{}/{}/{}'.format(pool, volume, name): '/ceph/snapshots/{}/{}/{}'.format(pool, volume, new_name)
})
# Update the snapshot stats in ZK
zkhandler.writedata(zk_conn, {
'/ceph/snapshots/{}/{}/{}/stats'.format(pool, volume, new_name): '{}',
})
# Log it
logger.out('Renamed RBD volume snapshot {} to {} for volume {} on pool {}'.format(name, new_name, volume, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to rename RBD volume snapshot {} for volume {} on pool {}: {}'.format(name, volume, pool, e), state='e')
return False
def remove_snapshot(zk_conn, logger, pool, volume, name):
# We are ready to create a new snapshot on this node
logger.out('Removing RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='i')
try:
# Delete snapshot from ZK
zkhandler.deletekey(zk_conn, '/ceph/snapshots/{}/{}/{}'.format(pool, volume, name))
# Remove the snapshot
retcode, stdout, stderr = common.run_os_command('rbd snap rm {}/{}@{}'.format(pool, volume, name))
if retcode:
print('rbd snap rm')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Removed RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to remove RBD snapshot {} of volume {} on pool {}: {}'.format(name, volume, pool, e), state='e')
return False
# Primary command function
# This command pipe is only used for OSD adds and removes
def run_command(zk_conn, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
@ -804,301 +415,3 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Online an OSD
elif command == 'osd_in':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Online the OSD
result = in_osd(zk_conn, logger, osd_id)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Offline an OSD
elif command == 'osd_out':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Offline the OSD
result = out_osd(zk_conn, logger, osd_id)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Set a property
elif command == 'osd_set':
option = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Set the property
result = set_property(zk_conn, logger, option)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Unset a property
elif command == 'osd_unset':
option = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Unset the property
result = unset_property(zk_conn, logger, option)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new pool
elif command == 'pool_add':
name, pgs, copies, mincopies = args.split(',')
copies = copies.replace('copies=','')
mincopies = mincopies.replace('mincopies=','')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Add the pool
result = add_pool(zk_conn, logger, name, pgs, copies, mincopies)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing a pool
elif command == 'pool_remove':
name = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Remove the pool
result = remove_pool(zk_conn, logger, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new volume
elif command == 'volume_add':
pool, name, size = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Add the volume
result = add_volume(zk_conn, logger, pool, name, size)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Cloning a volume
elif command == 'volume_clone':
pool, name_orig, name_new, prefix = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Clone the volume
result = clone_volume(zk_conn, logger, pool, name_orig, name_new, prefix)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Resizing a volume
elif command == 'volume_resize':
pool, name, size = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Add the volume
result = resize_volume(zk_conn, logger, pool, name, size)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Renaming a new volume
elif command == 'volume_rename':
pool, name, new_name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Add the volume
result = rename_volume(zk_conn, logger, pool, name, new_name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing a volume
elif command == 'volume_remove':
pool, name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Remove the volume
result = remove_volume(zk_conn, logger, pool, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new snapshot
elif command == 'snapshot_add':
pool, volume, name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Add the snapshot
result = add_snapshot(zk_conn, logger, pool, volume, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Renaming a snapshot
elif command == 'snapshot_rename':
pool, volume, name, new_name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Add the snapshot
result = rename_snapshot(zk_conn, logger, pool, volume, name, new_name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing a snapshot
elif command == 'snapshot_remove':
pool, volume, name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock:
# Remove the snapshot
result = remove_snapshot(zk_conn, logger, pool, volume, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)