diff --git a/client-common/ceph.py b/client-common/ceph.py index 77065c32..58c684fc 100644 --- a/client-common/ceph.py +++ b/client-common/ceph.py @@ -1026,6 +1026,35 @@ def rename_volume(zk_conn, pool, name, new_name): return success, message +def clone_volume(zk_conn, pool, name, new_name): + # Tell the cluster to clone + clone_volume_string = 'volume_clone {},{},{}'.format(pool, name, new_name) + zkhandler.writedata(zk_conn, {'/cmd/ceph': clone_volume_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] + if result == 'success-volume_clone': + message = 'Cloned RBD volume "{}" to "{}" on pool "{}".'.format(name, new_name, pool) + success = True + else: + message = 'ERROR: Failed to clone volume {} to {}; check node logs for details.'.format(name, new_name) + success = False + except: + message = 'ERROR: Command ignored by node.' + success = False + + # Acquire a write lock to ensure things go smoothly + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') + with lock: + time.sleep(1) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) + + return success, message + def remove_volume(zk_conn, pool, name): if not verifyVolume(zk_conn, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool {}.'.format(name, pool) diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py index 96489cef..c133c34c 100644 --- a/node-daemon/pvcd/CephInstance.py +++ b/node-daemon/pvcd/CephInstance.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# CehpInstance.py - Class implementing a PVC node Ceph instance +# CephInstance.py - Class implementing a PVC node Ceph instance # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018 Joshua M. Boniface @@ -532,6 +532,39 @@ def add_volume(zk_conn, logger, pool, name, size): logger.out('Failed to create new RBD volume {} on pool {}: {}'.format(name, pool, e), state='e') return False +def clone_volume(zk_conn, logger, pool, name_orig, name_new, prefix): + if not prefix: + # Ensure that prefix is just an empty string if it isn't set + prefix = '' + logger.out('Cloning RBD volume {} to {}{} on pool {}'.format(name_orig, prefix, name_new, pool), state='i') + try: + # Clone the volume + retcode, stdout, stderr = common.run_os_command('rbd copy {}/{} {}/{}{}'.format(pool, name_orig, pool, prefix, name_new)) + if retcode: + print('rbd copy') + print(stdout) + print(stderr) + raise + + # Get volume stats + retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}{}'.format(pool, prefix, name_new)) + volstats = stdout + + # Add the new volume to ZK + zkhandler.writedata(zk_conn, { + '/ceph/volumes/{}/{}{}'.format(pool, prefix, name_new): '', + '/ceph/volumes/{}/{}{}/stats'.format(pool, prefix, name_new): volstats, + '/ceph/snapshots/{}/{}{}'.format(pool, prefix, name_new): '', + }) + + # Log it + logger.out('Cloned RBD volume {} to {}{} on pool {}'.format(name_orig, prefix, name_new, pool), stats='o') + return True + except Exception as e: + # Log it + logger.out('Failed to clone RBD volume {} to {}{} on pool {}: {}'.format(name_orig, prefix, name_new, pool, e), state='e') + return False + def resize_volume(zk_conn, logger, pool, name, size): logger.out('Resizing RBD volume {} on pool {} to size {}'.format(name, pool, size), state='i') try: @@ -920,6 +953,27 @@ def run_command(zk_conn, logger, this_node, data, d_osd): # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) + # Cloning a volume + elif command == 'volume_clone': + pool, name_orig, name_new, prefix = args.split(',') + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') + with zk_lock: + # Clone the volume + result = clone_volume(zk_conn, logger, pool, name_orig, name_new, prefix) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + # Resizing a volume elif command == 'volume_resize': pool, name, size = args.split(',')