diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 0c325bd7..4db6b1cd 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -3777,7 +3777,7 @@ class API_VM_Snapshot_Receive_Block(Resource): """ return api_helper.vm_snapshot_receive_block( reqargs.get("pool"), - reqargs.get("volume") + "_recv", + reqargs.get("volume"), reqargs.get("snapshot"), int(reqargs.get("size")), flask.request.stream, @@ -3788,6 +3788,82 @@ class API_VM_Snapshot_Receive_Block(Resource): api.add_resource(API_VM_Snapshot_Receive_Block, "/vm//snapshot/receive/block") +# /vm//snapshot/receive/config +class API_VM_Snapshot_Receive_Config(Resource): + @RequestParser( + [ + { + "name": "snapshot", + "required": True, + }, + { + "name": "source_snapshot", + "required": False, + }, + ] + ) + @Authenticator + def post(self, vm, reqargs): + """ + Receive a snapshot of a VM configuration from another PVC cluster + + NOTICE: This is an API-internal endpoint used by /vm//snapshot/send; it should never be called by a client. + --- + tags: + - vm + parameters: + - in: query + name: pool + type: string + required: true + description: The name of the destination Ceph RBD data pool + - in: query + name: volume + type: string + required: true + description: The name of the destination Ceph RBD volume + - in: query + name: snapshot + type: string + required: true + description: The name of the destination Ceph RBD volume snapshot + - in: query + name: size + type: integer + required: true + description: The size in bytes of the Ceph RBD volume + - in: query + name: source_snapshot + type: string + required: false + description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers + responses: + 200: + description: OK + schema: + type: object + id: Message + 400: + description: Execution error + schema: + type: object + id: Message + 404: + description: Not found + schema: + type: object + id: Message + """ + return api_helper.vm_snapshot_receive_config( + reqargs.get("snapshot"), + flask.request.get_json(), + source_snapshot=reqargs.get("source_snapshot"), + ) + + +api.add_resource(API_VM_Snapshot_Receive_Config, "/vm//snapshot/receive/config") + + # /vm/autobackup class API_VM_Autobackup_Root(Resource): @RequestParser( diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index 8c4190d7..282f4ded 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -21,7 +21,9 @@ import flask import json +import logging import lxml.etree as etree +import sys from re import match from requests import get @@ -40,6 +42,15 @@ import daemon_lib.network as pvc_network import daemon_lib.ceph as pvc_ceph +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.INFO) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + + # # Cluster base functions # @@ -1294,20 +1305,46 @@ def vm_flush_locks(zkhandler, vm): return output, retcode +@ZKConnection(config) def vm_snapshot_receive_block( - pool, volume, snapshot, size, stream, source_snapshot=None + zkhandler, pool, volume, snapshot, size, stream, source_snapshot=None ): + """ + Receive an RBD volume from a remote system + """ try: import rados import rbd + _, rbd_detail = pvc_ceph.get_list_volume( + zkhandler, pool, limit=volume, is_fuzzy=False + ) + if len(rbd_detail) > 0: + volume_exists = True + else: + volume_exists = False + cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") cluster.connect() ioctx = cluster.open_ioctx(pool) - if not source_snapshot: + if not source_snapshot and not volume_exists: rbd_inst = rbd.RBD() rbd_inst.create(ioctx, volume, size) + retflag, retdata = pvc_ceph.add_volume( + zkhandler, pool, volume, str(size) + "B", force_flag=True, zk_only=True + ) + if not retflag: + ioctx.close() + cluster.shutdown() + + if retflag: + retcode = 200 + else: + retcode = 400 + + output = {"message": retdata.replace('"', "'")} + return output, retcode image = rbd.Image(ioctx, volume) @@ -1316,7 +1353,9 @@ def vm_snapshot_receive_block( if source_snapshot: # Receiving diff data - print(f"Applying diff between {source_snapshot} and {snapshot}") + logger.info( + f"Applying diff between {pool}/{volume}@{source_snapshot} and {snapshot}" + ) while True: chunk = stream.read(chunk_size) if not chunk: @@ -1327,11 +1366,9 @@ def vm_snapshot_receive_block( length = int.from_bytes(chunk[8:16], "big") data = chunk[16 : 16 + length] image.write(data, offset) - - image.create_snap(snapshot) else: # Receiving full image - print(f"Importing full snapshot {snapshot}") + logger.info(f"Importing full snapshot {pool}/{volume}@{snapshot}") while True: chunk = flask.request.stream.read(chunk_size) if not chunk: @@ -1339,7 +1376,22 @@ def vm_snapshot_receive_block( image.write(chunk, last_chunk) last_chunk += len(chunk) - image.create_snap(snapshot) + image.create_snap(snapshot) + retflag, retdata = pvc_ceph.add_snapshot( + zkhandler, pool, volume, snapshot, zk_only=True + ) + if not retflag: + image.close() + ioctx.close() + cluster.shutdown() + + if retflag: + retcode = 200 + else: + retcode = 400 + + output = {"message": retdata.replace('"', "'")} + return output, retcode image.close() ioctx.close() @@ -1348,6 +1400,183 @@ def vm_snapshot_receive_block( return {"message": f"Failed to import block device: {e}"}, 400 +@ZKConnection(config) +def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=None): + """ + Receive a VM configuration from a remote system + + This function requires some explanation. + + We get a full JSON dump of the VM configuration as provided by `pvc vm info`. This contains all the information we + reasonably need to replicate the VM at the given snapshot, including metainformation. + + First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists, + this is an issue and we have to error. But this should have already happened with the RBD volumes. + """ + print(vm_config) + + def parse_unified_diff(diff_text, original_text): + """ + Take a unified diff and apply it to an original string + """ + # Split the original string into lines + original_lines = original_text.splitlines(keepends=True) + patched_lines = [] + original_idx = 0 # Track position in original lines + + diff_lines = diff_text.splitlines(keepends=True) + + for line in diff_lines: + if line.startswith("---") or line.startswith("+++"): + # Ignore prefix lines + continue + if line.startswith("@@"): + # Extract line numbers from the diff hunk header + hunk_header = line + parts = hunk_header.split(" ") + original_range = parts[1] + + # Get the starting line number and range length for the original file + original_start, _ = map(int, original_range[1:].split(",")) + + # Adjust for zero-based indexing + original_start -= 1 + + # Add any lines between the current index and the next hunk's start + while original_idx < original_start: + patched_lines.append(original_lines[original_idx]) + original_idx += 1 + + elif line.startswith("-"): + # This line should be removed from the original, skip it + original_idx += 1 + elif line.startswith("+"): + # This line should be added to the patched version, removing the '+' + patched_lines.append(line[1:]) + else: + # Context line (unchanged), it has no prefix, add from the original + patched_lines.append(original_lines[original_idx]) + original_idx += 1 + + # Add any remaining lines from the original file after the last hunk + patched_lines.extend(original_lines[original_idx:]) + + return "".join(patched_lines).strip() + + # Get our XML configuration for this snapshot + # We take the main XML configuration, then apply the diff for this particular incremental + current_snapshot = [s for s in vm_config["snapshots"] if s["name"] == snapshot][0] + vm_xml = vm_config["xml"] + vm_xml_diff = "\n".join(current_snapshot["xml_diff_lines"]) + snapshot_vm_xml = parse_unified_diff(vm_xml_diff, vm_xml) + + if ( + source_snapshot is not None + or pvc_vm.searchClusterByUUID(zkhandler, vm_config["uuid"]) is not None + ): + logger.info( + f"Receiving incremental VM configuration for {vm_config['name']}@{snapshot}" + ) + + # Modify the VM based on our passed detail + retcode, retmsg = pvc_vm.modify_vm( + zkhandler, + vm_config["uuid"], + False, + snapshot_vm_xml, + ) + retcode, retmsg = pvc_vm.modify_vm_metadata( + zkhandler, + vm_config["uuid"], + None, # Node limits are left unchanged + vm_config["node_selector"], + vm_config["node_autostart"], + vm_config["profile"], + vm_config["migration_method"], + vm_config["migration_max_downtime"], + ) + + current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"])) + new_vm_tags = [t["name"] for t in vm_config["tags"]] + remove_tags = [] + add_tags = [] + for tag in vm_config["tags"]: + if tag["name"] not in current_vm_tags: + add_tags.append((tag["name"], tag["protected"])) + for tag in current_vm_tags: + if tag not in new_vm_tags: + remove_tags.append(tag) + + for tag in add_tags: + name, protected = tag + pvc_vm.modify_vm_tag( + zkhandler, vm_config["uuid"], "add", name, protected=protected + ) + for tag in remove_tags: + pvc_vm.modify_vm_tag(zkhandler, vm_config["uuid"], "remove", name) + else: + logger.info( + f"Receiving full VM configuration for {vm_config['name']}@{snapshot}" + ) + + # Define the VM based on our passed detail + retcode, retmsg = pvc_vm.define_vm( + zkhandler, + snapshot_vm_xml, + None, # Target node is autoselected + None, # Node limits are invalid here so ignore them + vm_config["node_selector"], + vm_config["node_autostart"], + vm_config["migration_method"], + vm_config["migration_max_downtime"], + vm_config["profile"], + vm_config["tags"], + "mirror", + ) + + # Add this snapshot to the VM manually in Zookeeper + zkhandler.write( + [ + ( + ( + "domain.snapshots", + vm_config["uuid"], + "domain_snapshot.name", + snapshot, + ), + snapshot, + ), + ( + ( + "domain.snapshots", + vm_config["uuid"], + "domain_snapshot.timestamp", + snapshot, + ), + current_snapshot["timestamp"], + ), + ( + ( + "domain.snapshots", + vm_config["uuid"], + "domain_snapshot.xml", + snapshot, + ), + snapshot_vm_xml, + ), + ( + ( + "domain.snapshots", + vm_config["uuid"], + "domain_snapshot.rbd_snapshots", + snapshot, + ), + ",".join(current_snapshot["rbd_snapshots"]), + ), + ] + ) + + # # Network functions # diff --git a/daemon-common/ceph.py b/daemon-common/ceph.py index c5cdd5d9..e6a871d1 100644 --- a/daemon-common/ceph.py +++ b/daemon-common/ceph.py @@ -1168,11 +1168,14 @@ def get_list_snapshot(zkhandler, target_pool, target_volume, limit=None, is_fuzz continue if target_volume and volume_name != target_volume: continue - snapshot_stats = json.loads( - zkhandler.read( - ("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}") + try: + snapshot_stats = json.loads( + zkhandler.read( + ("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}") + ) ) - ) + except Exception: + snapshot_stats = [] if limit: try: if re.fullmatch(limit, snapshot_name): diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 20d9c1bc..e638381a 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -3184,6 +3184,8 @@ def vm_worker_send_snapshot( ) return False + vm_name = vm_detail["name"] + # Validate that the destination cluster can be reached destination_api_timeout = (3.05, 172800) destination_api_headers = { @@ -3267,6 +3269,11 @@ def vm_worker_send_snapshot( verify=destination_api_verify_ssl, ) destination_vm_status = response.json() + if len(destination_vm_status) > 0: + destination_vm_status = destination_vm_status[0] + else: + destination_vm_status = {} + current_destination_vm_state = destination_vm_status.get("state", None) if ( current_destination_vm_state is not None @@ -3351,7 +3358,7 @@ def vm_worker_send_snapshot( # Begin send, set stages total_stages = ( 2 - + (2 * len(snapshot_rbdsnaps)) + + (3 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0) ) @@ -3384,7 +3391,7 @@ def vm_worker_send_snapshot( return False try: - size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) + _ = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"]) except Exception as e: error_message = f"Failed to get volume size for {rbd_name}: {e}" @@ -3395,7 +3402,7 @@ def vm_worker_send_snapshot( current_stage += 1 update( celery, - f"Creating remote volume {pool}/{volume} for {rbd_name}@{snapshot_name}", + f"Checking for remote volume {rbd_name}", current=current_stage, total=total_stages, ) @@ -3416,26 +3423,6 @@ def vm_worker_send_snapshot( ) return False - # Create the volume on the target - params = { - "size": size_bytes, - } - response = requests.post( - f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", - timeout=destination_api_timeout, - headers=destination_api_headers, - params=params, - data=None, - verify=destination_api_verify_ssl, - ) - destination_volume_create_status = response.json() - if response.status_code != 200: - fail( - celery, - f"Failed to create volume {rbd_name} on target: {destination_volume_create_status['message']}", - ) - return False - # Send the volume to the remote cluster = rados.Rados(conffile="/etc/ceph/ceph.conf") cluster.connect() @@ -3508,7 +3495,7 @@ def vm_worker_send_snapshot( } try: response = requests.post( - f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}", + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", timeout=destination_api_timeout, headers=send_headers, params=send_params, @@ -3516,10 +3503,10 @@ def vm_worker_send_snapshot( verify=destination_api_verify_ssl, ) response.raise_for_status() - except Exception as e: + except Exception: fail( celery, - f"Failed to send snapshot: {e}", + f"Failed to send snapshot: {response.json()['message']}", ) return False finally: @@ -3527,10 +3514,43 @@ def vm_worker_send_snapshot( ioctx.close() cluster.shutdown() - # Send the VM configuration - # if current_destination_vm_state is None: - # This is a new VM, so define it - # response = requests.post() - # else: - # This is a modification - # response = requests.post() + current_stage += 1 + update( + celery, + f"Sending VM configuration for {vm_name}@{snapshot_name}", + current=current_stage, + total=total_stages, + ) + + send_params = { + "snapshot": snapshot_name, + "source_snapshot": incremental_parent, + } + send_headers = { + "X-Api-Key": destination_api_key, + "Content-Type": "application/json", + } + try: + response = requests.post( + f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", + timeout=destination_api_timeout, + headers=send_headers, + params=send_params, + json=vm_detail, + verify=destination_api_verify_ssl, + ) + response.raise_for_status() + except Exception as e: + fail( + celery, + f"Failed to send config: {e}", + ) + return False + + current_stage += 1 + return finish( + celery, + f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'", + current=current_stage, + total=total_stages, + )