From 7785166a7e726475435241af458858dd71d3af37 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Mon, 30 Sep 2024 02:31:06 -0400 Subject: [PATCH] Finish working implementation of send/receive Required some significant refactoring due to issues with the diff send, but it works. --- api-daemon/pvcapid/flaskapi.py | 153 +++++++++++++++++++++++-- api-daemon/pvcapid/helper.py | 175 ++++++++++++++++------------ daemon-common/vm.py | 204 ++++++++++++++++++++++----------- 3 files changed, 378 insertions(+), 154 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 4db6b1cd..d3a4699c 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -3717,16 +3717,12 @@ class API_VM_Snapshot_Receive_Block(Resource): "name": "size", "required": True, }, - { - "name": "source_snapshot", - "required": False, - }, ] ) @Authenticator def post(self, vm, reqargs): """ - Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental + Receive a full snapshot of a single RBD volume from another PVC cluster NOTICE: This is an API-internal endpoint used by /vm//snapshot/send; it should never be called by a client. --- @@ -3753,11 +3749,6 @@ class API_VM_Snapshot_Receive_Block(Resource): type: integer required: true description: The size in bytes of the Ceph RBD volume - - in: query - name: source_snapshot - type: string - required: false - description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers responses: 200: description: OK @@ -3775,13 +3766,151 @@ class API_VM_Snapshot_Receive_Block(Resource): type: object id: Message """ - return api_helper.vm_snapshot_receive_block( + return api_helper.vm_snapshot_receive_block_full( reqargs.get("pool"), reqargs.get("volume"), reqargs.get("snapshot"), int(reqargs.get("size")), flask.request.stream, - source_snapshot=reqargs.get("source_snapshot"), + ) + + @RequestParser( + [ + { + "name": "pool", + "required": True, + }, + { + "name": "volume", + "required": True, + }, + { + "name": "snapshot", + "required": True, + }, + { + "name": "source_snapshot", + "required": True, + }, + ] + ) + @Authenticator + def put(self, vm, reqargs): + """ + Receive a single diff element from a snapshot of a single RBD volume from another PVC cluster + + NOTICE: This is an API-internal endpoint used by /vm//snapshot/send; it should never be called by a client. + --- + tags: + - vm + parameters: + - in: query + name: pool + type: string + required: true + description: The name of the destination Ceph RBD data pool + - in: query + name: volume + type: string + required: true + description: The name of the destination Ceph RBD volume + - in: query + name: snapshot + type: string + required: true + description: The name of the destination Ceph RBD volume snapshot + - in: query + name: source_snapshot + type: string + required: true + description: The name of the destination Ceph RBD volume snapshot parent + responses: + 200: + description: OK + schema: + type: object + id: Message + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + return api_helper.vm_snapshot_receive_block_diff( + reqargs.get("pool"), + reqargs.get("volume"), + reqargs.get("snapshot"), + reqargs.get("source_snapshot"), + flask.request.stream, + ) + + @RequestParser( + [ + { + "name": "pool", + "required": True, + }, + { + "name": "volume", + "required": True, + }, + { + "name": "snapshot", + "required": True, + }, + ] + ) + @Authenticator + def patch(self, vm, reqargs): + """ + Create the block snapshot at snapshot of volume + + NOTICE: This is an API-internal endpoint used by /vm//snapshot/send; it should never be called by a client. + --- + tags: + - vm + parameters: + - in: query + name: pool + type: string + required: true + description: The name of the destination Ceph RBD data pool + - in: query + name: volume + type: string + required: true + description: The name of the destination Ceph RBD volume + - in: query + name: snapshot + type: string + required: true + description: The name of the destination Ceph RBD volume snapshot + responses: + 200: + description: OK + schema: + type: object + id: Message + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + return api_helper.vm_snapshot_receive_block_createsnap( + reqargs.get("pool"), + reqargs.get("volume"), + reqargs.get("snapshot"), ) diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index 282f4ded..35e12536 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -1306,82 +1306,32 @@ def vm_flush_locks(zkhandler, vm): @ZKConnection(config) -def vm_snapshot_receive_block( - zkhandler, pool, volume, snapshot, size, stream, source_snapshot=None -): +def vm_snapshot_receive_block_full(zkhandler, pool, volume, snapshot, size, stream): """ Receive an RBD volume from a remote system """ - try: - import rados - import rbd + import rados + import rbd - _, rbd_detail = pvc_ceph.get_list_volume( - zkhandler, pool, limit=volume, is_fuzzy=False - ) - if len(rbd_detail) > 0: - volume_exists = True - else: - volume_exists = False + _, rbd_detail = pvc_ceph.get_list_volume( + zkhandler, pool, limit=volume, is_fuzzy=False + ) + if len(rbd_detail) > 0: + volume_exists = True + else: + volume_exists = False - cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") - cluster.connect() - ioctx = cluster.open_ioctx(pool) + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") + cluster.connect() + ioctx = cluster.open_ioctx(pool) - if not source_snapshot and not volume_exists: - rbd_inst = rbd.RBD() - rbd_inst.create(ioctx, volume, size) - retflag, retdata = pvc_ceph.add_volume( - zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True - ) - if not retflag: - ioctx.close() - cluster.shutdown() - - if retflag: - retcode = 200 - else: - retcode = 400 - - output = {"message": retdata.replace('"', "'")} - return output, retcode - - image = rbd.Image(ioctx, volume) - - last_chunk = 0 - chunk_size = 1024 * 1024 * 128 - - if source_snapshot: - # Receiving diff data - logger.info( - f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}" - ) - while True: - chunk = stream.read(chunk_size) - if not chunk: - break - - # Extract the offset and length (8 bytes each) and the data - offset = int.from_bytes(chunk[:8], "big") - length = int.from_bytes(chunk[8:16], "big") - data = chunk[16 : 16 + length] - image.write(data, offset) - else: - # Receiving full image - logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}") - while True: - chunk = flask.request.stream.read(chunk_size) - if not chunk: - break - image.write(chunk, last_chunk) - last_chunk += len(chunk) - - image.create_snap(snapshot) - retflag, retdata = pvc_ceph.add_snapshot( - zkhandler, pool, volume, snapshot, zk_only=True + if not volume_exists: + rbd_inst = rbd.RBD() + rbd_inst.create(ioctx, volume, size) + retflag, retdata = pvc_ceph.add_volume( + zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True ) if not retflag: - image.close() ioctx.close() cluster.shutdown() @@ -1393,11 +1343,90 @@ def vm_snapshot_receive_block( output = {"message": retdata.replace('"', "'")} return output, retcode - image.close() - ioctx.close() - cluster.shutdown() - except Exception as e: - return {"message": f"Failed to import block device: {e}"}, 400 + image = rbd.Image(ioctx, volume) + + last_chunk = 0 + chunk_size = 1024 * 1024 * 64 + + logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}") + while True: + chunk = flask.request.stream.read(chunk_size) + if not chunk: + break + image.write(chunk, last_chunk) + last_chunk += len(chunk) + + image.close() + ioctx.close() + cluster.shutdown() + + +@ZKConnection(config) +def vm_snapshot_receive_block_diff( + zkhandler, pool, volume, snapshot, source_snapshot, stream +): + """ + Receive an RBD volume from a remote system + """ + import rados + import rbd + + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") + cluster.connect() + ioctx = cluster.open_ioctx(pool) + image = rbd.Image(ioctx, volume) + + logger.info( + f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}" + ) + + chunk = stream.read() + + print(type(chunk)) + print(len(chunk)) + + # Extract the offset and length (8 bytes each) and the data + offset = int.from_bytes(chunk[:8], "big") + length = int.from_bytes(chunk[8:16], "big") + data = chunk[16 : 16 + length] + print(f"Writing {length} bytes to {offset}") + written = image.write(data, offset) + print(f"Wrote {written} bytes") + + image.close() + ioctx.close() + cluster.shutdown() + + +@ZKConnection(config) +def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot): + """ + Create the snapshot of a remote volume + """ + import rados + import rbd + + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") + cluster.connect() + ioctx = cluster.open_ioctx(pool) + image = rbd.Image(ioctx, volume) + image.create_snap(snapshot) + image.close() + ioctx.close() + cluster.shutdown() + + retflag, retdata = pvc_ceph.add_snapshot( + zkhandler, pool, volume, snapshot, zk_only=True + ) + if not retflag: + + if retflag: + retcode = 200 + else: + retcode = 400 + + output = {"message": retdata.replace('"', "'")} + return output, retcode @ZKConnection(config) diff --git a/daemon-common/vm.py b/daemon-common/vm.py index e638381a..97addd91 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -3269,7 +3269,7 @@ def vm_worker_send_snapshot( verify=destination_api_verify_ssl, ) destination_vm_status = response.json() - if len(destination_vm_status) > 0: + if type(destination_vm_status) is list and len(destination_vm_status) > 0: destination_vm_status = destination_vm_status[0] else: destination_vm_status = {} @@ -3358,10 +3358,43 @@ def vm_worker_send_snapshot( # Begin send, set stages total_stages = ( 2 - + (3 * len(snapshot_rbdsnaps)) + + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0) ) + current_stage += 1 + update( + celery, + f"Sending VM configuration for {vm_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + send_params = { + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + send_headers = { + "X-Api-Key": destination_api_key, + "Content-Type": "application/json", + } + try: + response = requests.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + json=vm_detail, + verify=destination_api_verify_ssl, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send config: {e}", + ) + return False + # Create the block devices on the remote side if this is a new VM send for rbd_detail in [r for r in vm_detail["disks"] if r["type"] == "rbd"]: rbd_name = rbd_detail["name"] @@ -3429,30 +3462,85 @@ def vm_worker_send_snapshot( ioctx = cluster.open_ioctx(pool) image = rbd.Image(ioctx, name=volume, snapshot=snapshot_name, read_only=True) size = image.size() - chunk_size_mb = 128 + chunk_size_mb = 64 if incremental_parent is not None: # Diff between incremental_parent and snapshot - celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd_name}" - - def diff_chunker(): - def diff_cb(offset, length, exists): - """Callback to handle diff regions""" - if exists: - data = image.read(offset, length) - yield ( - offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data - ) - - image.set_snap(incremental_parent) - image.diff_iterate(0, size, incremental_parent, diff_cb) - - data_stream = diff_chunker() + celery_message = ( + f"Sending diff {incremental_parent}>{snapshot_name} for {rbd_name}" + ) else: # Full image transfer celery_message = f"Sending full image of {rbd_name}@{snapshot_name}" - def chunker(): + current_stage += 1 + update( + celery, + celery_message, + current=current_stage, + total=total_stages, + ) + + send_headers = { + "X-Api-Key": destination_api_key, + "Content-Type": "application/octet-stream", + "Transfer-Encoding": None, # Disable chunked transfer encoding + } + + if incremental_parent is not None: + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + + last_chunk_time = time.time() + + def diff_cb_send(offset, length, exists): + nonlocal last_chunk_time + if exists: + data = image.read(offset, length) + block = offset.to_bytes(8, "big") + length.to_bytes(8, "big") + data + response = requests.put( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + data=block, + verify=destination_api_verify_ssl, + ) + response.raise_for_status() + + current_chunk_time = time.time() + chunk_time = current_chunk_time - last_chunk_time + last_chunk_time = current_chunk_time + chunk_speed = round(4 / chunk_time, 1) + update( + celery, + celery_message + f" ({chunk_speed} MB/s)", + current=current_stage, + total=total_stages, + ) + + try: + image.set_snap(snapshot_name) + image.diff_iterate( + 0, size, incremental_parent, diff_cb_send, whole_object=True + ) + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() + else: + + def full_chunker(): chunk_size = 1024 * 1024 * chunk_size_mb current_chunk = 0 last_chunk_time = time.time() @@ -3471,35 +3559,46 @@ def vm_worker_send_snapshot( total=total_stages, ) - data_stream = chunker() + send_params = { + "pool": pool, + "volume": volume, + "snapshot": snapshot_name, + "size": size, + "source_snapshot": incremental_parent, + } - current_stage += 1 - update( - celery, - celery_message, - current=current_stage, - total=total_stages, - ) + try: + response = requests.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + data=full_chunker(), + verify=destination_api_verify_ssl, + ) + response.raise_for_status() + except Exception: + fail( + celery, + f"Failed to send snapshot: {response.json()['message']}", + ) + return False + finally: + image.close() + ioctx.close() + cluster.shutdown() send_params = { "pool": pool, "volume": volume, "snapshot": snapshot_name, - "size": size, - "source_snapshot": incremental_parent, - } - send_headers = { - "X-Api-Key": destination_api_key, - "Content-Type": "application/octet-stream", - "Transfer-Encoding": None, # Disable chunked transfer encoding } try: - response = requests.post( + response = requests.patch( f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", timeout=destination_api_timeout, headers=send_headers, params=send_params, - data=data_stream, verify=destination_api_verify_ssl, ) response.raise_for_status() @@ -3514,39 +3613,6 @@ def vm_worker_send_snapshot( ioctx.close() cluster.shutdown() - current_stage += 1 - update( - celery, - f"Sending VM configuration for {vm_name}@{snapshot_name}", - current=current_stage, - total=total_stages, - ) - - send_params = { - "snapshot": snapshot_name, - "source_snapshot": incremental_parent, - } - send_headers = { - "X-Api-Key": destination_api_key, - "Content-Type": "application/json", - } - try: - response = requests.post( - f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", - timeout=destination_api_timeout, - headers=send_headers, - params=send_params, - json=vm_detail, - verify=destination_api_verify_ssl, - ) - response.raise_for_status() - except Exception as e: - fail( - celery, - f"Failed to send config: {e}", - ) - return False - current_stage += 1 return finish( celery,