Add VM automirror support

Allows shipping snapshots automatically to remote clusters on a cron,
identically to how autobackup handles local snapshot exports.

VMs are selected based on configured tags, and individual destination
clusters can be specified based on a colon-separated suffix to the
tag(s).

Automirror snapshots use the prefix "am" (analogous to "ab" for
autobackups) to differentiate them from normal "mr" mirrors.
This commit is contained in:
2024-11-15 01:37:27 -05:00
parent cebc660fb0
commit 078d48a50b
8 changed files with 1010 additions and 120 deletions

496
daemon-common/automirror.py Normal file
View File

@ -0,0 +1,496 @@
#!/usr/bin/env python3
# automirror.py - PVC API Automirror functions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2024 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import requests
from datetime import datetime
from os import popen
from daemon_lib.config import get_automirror_configuration
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
import daemon_lib.vm as vm
def send_execution_failure_report(
celery, config, recipients=None, total_time=0, error=None
):
if recipients is None:
return
from email.utils import formatdate
from socket import gethostname
log_message = f"Sending email failure report to {', '.join(recipients)}"
log_info(celery, log_message)
current_datetime = datetime.now()
email_datetime = formatdate(float(current_datetime.strftime("%s")))
email = list()
email.append(f"Date: {email_datetime}")
email.append(
f"Subject: PVC Automirror execution failure for cluster '{config['cluster']}'"
)
email_to = list()
for recipient in recipients:
email_to.append(f"<{recipient}>")
email.append(f"To: {', '.join(email_to)}")
email.append(f"From: PVC Automirror System <pvc@{gethostname()}>")
email.append("")
email.append(
f"A PVC automirror has FAILED at {current_datetime} in {total_time}s due to an execution error."
)
email.append("")
email.append("The reported error message is:")
email.append(f" {error}")
try:
with popen("/usr/sbin/sendmail -t", "w") as p:
p.write("\n".join(email))
except Exception as e:
log_err(f"Failed to send report email: {e}")
def send_execution_summary_report(
celery,
config,
recipients=None,
total_time=0,
summary=dict(),
local_deleted_snapshots=dict(),
):
if recipients is None:
return
from email.utils import formatdate
from socket import gethostname
log_message = f"Sending email summary report to {', '.join(recipients)}"
log_info(celery, log_message)
current_datetime = datetime.now()
email_datetime = formatdate(float(current_datetime.strftime("%s")))
email = list()
email.append(f"Date: {email_datetime}")
email.append(f"Subject: PVC Automirror report for cluster '{config['cluster']}'")
email_to = list()
for recipient in recipients:
email_to.append(f"<{recipient}>")
email.append(f"To: {', '.join(email_to)}")
email.append(f"From: PVC Automirror System <pvc@{gethostname()}>")
email.append("")
email.append(
f"A PVC automirror has been completed at {current_datetime} in {total_time}."
)
email.append("")
email.append(
"The following is a summary of all VM mirror jobs executed during this run:"
)
email.append("")
vm_names = {k.split(":")[0] for k in summary.keys()}
for vm_name in vm_names:
email.append(f"VM {vm_name}:")
email.append(" Mirror jobs:")
for destination_name in {
k.split(":")[1] for k in summary.keys() if k.split(":")[0] == vm_name
}:
mirror = summary[f"{vm_name}:{destination_name}"]
datestring = mirror.get("snapshot_name").replace("am", "")
mirror_date = datetime.strptime(datestring, "%Y%m%d%H%M%S")
if mirror.get("result", False):
email.append(
f" * {mirror_date}: Success to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}"
)
else:
email.append(
f" * {mirror_date}: Failure to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}"
)
email.append(
f" {mirror.get('result_message')}"
)
email.append(
" The following aged-out local snapshots were removed during cleanup:"
)
for snapshot in local_deleted_snapshots[vm_name]:
email.append(f" * {snapshot}")
try:
with popen("/usr/sbin/sendmail -t", "w") as p:
p.write("\n".join(email))
except Exception as e:
log_err(f"Failed to send report email: {e}")
def run_vm_mirror(
zkhandler, celery, config, vm_detail, snapshot_name, destination_name
):
vm_name = vm_detail["name"]
keep_count = config["mirror_keep_snapshots"]
try:
destination = config["mirror_destinations"][destination_name]
except Exception:
error_message = f"Failed to find valid destination cluster '{destination_name}' for VM '{vm_name}'"
log_err(celery, error_message)
return error_message
destination_api_uri = f"{'https' if destination['ssl'] else 'http'}://{destination['address']}:{destination['port']}{destination['prefix']}"
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
"X-Api-Key": destination["key"],
}
session = requests.Session()
session.headers.update(destination_api_headers)
session.verify = destination["verify_ssl"]
session.timeout = destination_api_timeout
# Get the last snapshot that is on the remote side for incrementals
response = session.get(
f"{destination_api_uri}/vm/{vm_name}",
params=None,
data=None,
)
destination_vm_detail = response.json()
if type(destination_vm_detail) is list and len(destination_vm_detail) > 0:
destination_vm_detail = destination_vm_detail[0]
try:
last_snapshot_name = [
s
for s in destination_vm_detail["snapshots"]
if s["name"].startswith("am")
][0]["name"]
except Exception:
last_snapshot_name = None
else:
last_snapshot_name = None
# Send the current snapshot
result, message = vm.vm_worker_send_snapshot(
zkhandler,
None,
vm_name,
snapshot_name,
destination_api_uri,
destination["key"],
destination_api_verify_ssl=destination["verify_ssl"],
incremental_parent=last_snapshot_name,
destination_storage_pool=destination["pool"],
return_status=True,
)
if not result:
return False, message
response = session.get(
f"{destination_api_uri}/vm/{vm_name}",
params=None,
data=None,
)
destination_vm_detail = response.json()
if type(destination_vm_detail) is list and len(destination_vm_detail) > 0:
destination_vm_detail = destination_vm_detail[0]
else:
message = "Remote VM somehow does not exist after successful mirror; skipping snapshot cleanup"
return False, message
# Find any mirror snapshots that are expired
remote_snapshots = [
s for s in destination_vm_detail["snapshots"] if s["name"].startswith("am")
]
# Snapshots are in dated descending order due to the names
if len(remote_snapshots) > keep_count:
remote_marked_for_deletion = [s["name"] for s in remote_snapshots[keep_count:]]
else:
remote_marked_for_deletion = list()
for snapshot in remote_marked_for_deletion:
log_info(
celery,
f"VM {vm_detail['name']} removing stale remote automirror snapshot {snapshot}",
)
session.delete(
f"{destination_api_uri}/vm/{vm_name}/snapshot",
params={
"snapshot_name": snapshot,
},
data=None,
)
session.close()
return True, remote_marked_for_deletion
def worker_cluster_automirror(
zkhandler,
celery,
force_full=False,
email_recipients=None,
email_errors_only=False,
):
config = get_automirror_configuration()
mirror_summary = dict()
local_deleted_snapshots = dict()
current_stage = 0
total_stages = 1
start(
celery,
f"Starting cluster '{config['cluster']}' VM automirror",
current=current_stage,
total=total_stages,
)
if not config["automirror_enabled"]:
message = "Automirrors are not configured on this cluster."
log_info(celery, message)
return finish(
celery,
message,
current=total_stages,
total=total_stages,
)
if email_recipients is not None:
total_stages += 1
automirror_start_time = datetime.now()
retcode, vm_list = vm.get_list(zkhandler)
if not retcode:
error_message = f"Failed to fetch VM list: {vm_list}"
log_err(celery, error_message)
current_stage += 1
send_execution_failure_report(
celery,
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
mirror_vms = list()
for vm_detail in vm_list:
mirror_vm = {
"detail": vm_detail,
"destinations": list(),
}
vm_tag_names = [t["name"] for t in vm_detail["tags"]]
# Check if any of the mirror tags are present; if they are, then we should mirror
vm_mirror_tags = list()
for tag in vm_tag_names:
if tag.split(":")[0] in config["mirror_tags"]:
vm_mirror_tags.append(tag)
# There are no mirror tags, so skip this VM
if len(vm_mirror_tags) < 1:
continue
# Go through each tag to extract the cluster
target_clusters = set()
for tag in vm_mirror_tags:
if len(tag.split(":")) == 1:
# This is a direct match without any cluster suffix, so use the default
target_clusters.add(config["mirror_default_destination"])
if len(tag.split(":")) > 1:
# This has a cluster suffix, so use that
target_clusters.add(tag.split(":")[1])
for cluster in target_clusters:
mirror_vm["destinations"].append(cluster)
mirror_vms.append(mirror_vm)
if len(mirror_vms) < 1:
message = "Found no VMs tagged for automirror."
log_info(celery, message)
return finish(
celery,
message,
current=total_stages,
total=total_stages,
)
total_stages += len(mirror_vms)
mirror_vm_names = set([b["detail"]["name"] for b in mirror_vms])
log_info(
celery,
f"Found {len(mirror_vm_names)} suitable VM(s) for automirror: {', '.join(mirror_vm_names)}",
)
# Execute the backup: take a snapshot, then export the snapshot
for mirror_vm in mirror_vms:
vm_detail = mirror_vm["detail"]
vm_destinations = mirror_vm["destinations"]
current_stage += 1
update(
celery,
f"Performing automirror of VM {vm_detail['name']}",
current=current_stage,
total=total_stages,
)
# Automirrors use a custom name to allow them to be properly cleaned up later
now = datetime.now()
datestring = now.strftime("%Y%m%d%H%M%S")
snapshot_name = f"am{datestring}"
result, message = vm.vm_worker_create_snapshot(
zkhandler,
None,
vm_detail["name"],
snapshot_name=snapshot_name,
return_status=True,
)
if not result:
for destination in vm_destinations:
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
"result": result,
"snapshot_name": snapshot_name,
"runtime_secs": 0,
"result_message": message,
}
continue
remote_marked_for_deletion = dict()
for destination in vm_destinations:
mirror_start = datetime.now()
result, ret = run_vm_mirror(
zkhandler,
celery,
config,
vm_detail,
snapshot_name,
destination,
)
mirror_end = datetime.now()
runtime_secs = (mirror_end - mirror_start).seconds
if result:
remote_marked_for_deletion[destination] = ret
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
"result": result,
"snapshot_name": snapshot_name,
"runtime_secs": runtime_secs,
}
else:
log_warn(
celery,
f"Error in mirror send: {ret}",
)
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
"result": result,
"snapshot_name": snapshot_name,
"runtime_secs": runtime_secs,
"result_message": ret,
}
# Find all local snapshots that were present in all remote snapshot deletions,
# then remove them
# If one of the sends fails, this should result in nothing being removed
if remote_marked_for_deletion:
all_lists = [set(lst) for lst in remote_marked_for_deletion.values() if lst]
if all_lists:
local_marked_for_deletion = set.intersection(*all_lists)
else:
local_marked_for_deletion = set()
else:
local_marked_for_deletion = set()
for snapshot in local_marked_for_deletion:
log_info(
celery,
f"VM {vm_detail['name']} removing stale local automirror snapshot {snapshot}",
)
vm.vm_worker_remove_snapshot(
zkhandler,
None,
vm_detail["name"],
snapshot,
)
local_deleted_snapshots[vm_detail["name"]] = local_marked_for_deletion
automirror_end_time = datetime.now()
automirror_total_time = automirror_end_time - automirror_start_time
if email_recipients is not None:
current_stage += 1
if email_errors_only and not all(
[s["result"] for _, s in mirror_summary.items()]
):
# Send report if we're in errors only and at least one send failed
send_report = True
elif not email_errors_only:
# Send report if we're not in errors only
send_report = True
else:
# Otherwise (errors only and all successful) don't send
send_report = False
if send_report:
update(
celery,
"Sending automirror results summary email",
current=current_stage,
total=total_stages,
)
send_execution_summary_report(
celery,
config,
recipients=email_recipients,
total_time=automirror_total_time,
summary=mirror_summary,
local_deleted_snapshots=local_deleted_snapshots,
)
else:
update(
celery,
"Skipping automirror results summary email (no failures)",
current=current_stage,
total=total_stages,
)
current_stage += 1
return finish(
celery,
f"Successfully completed cluster '{config['cluster']}' VM automirror",
current=current_stage,
total=total_stages,
)

View File

@ -481,6 +481,64 @@ def get_autobackup_configuration():
return config
def get_parsed_automirror_configuration(config_file):
"""
Load the configuration; this is the same main pvc.conf that the daemons read
"""
print('Loading configuration from file "{}"'.format(config_file))
with open(config_file, "r") as cfgfh:
try:
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
except Exception as e:
print(f"ERROR: Failed to parse configuration file: {e}")
os._exit(1)
config = dict()
try:
o_cluster = o_config["cluster"]
config_cluster = {
"cluster": o_cluster["name"],
"automirror_enabled": True,
}
config = {**config, **config_cluster}
o_automirror = o_config["automirror"]
if o_automirror is None:
config["automirror_enabled"] = False
return config
config_automirror = {
"mirror_tags": o_automirror["mirror_tags"],
"mirror_destinations": o_automirror["destinations"],
"mirror_default_destination": o_automirror["default_destination"],
"mirror_keep_snapshots": o_automirror["keep_snapshots"],
}
config = {**config, **config_automirror}
if config["mirror_default_destination"] not in [
d for d in config["mirror_destinations"].keys()
]:
raise Exception(
"Specified default mirror destination is not in the list of destinations"
)
except Exception as e:
raise MalformedConfigurationError(e)
return config
def get_automirror_configuration():
"""
Get the configuration.
"""
pvc_config_file = get_configuration_path()
config = get_parsed_automirror_configuration(pvc_config_file)
return config
def validate_directories(config):
if not os.path.exists(config["dynamic_directory"]):
os.makedirs(config["dynamic_directory"])

View File

@ -2107,6 +2107,7 @@ def vm_worker_create_snapshot(
domain,
snapshot_name=None,
zk_only=False,
return_status=False,
):
if snapshot_name is None:
now = datetime.now()
@ -2124,27 +2125,34 @@ def vm_worker_create_snapshot(
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
message = (f"Could not find VM '{domain}' in the cluster",)
fail(celery, message)
if return_status:
return False, message
else:
return False
reg = re.compile("^[a-z0-9.-_]+$")
if not reg.match(snapshot_name):
fail(
celery,
message = (
"Snapshot name '{snapshot_name}' contains invalid characters; only alphanumeric, '.', '-', and '_' characters are allowed",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid))
if current_snapshots and snapshot_name in current_snapshots:
fail(
celery,
message = (
f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Get the list of all RBD volumes
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
@ -2178,11 +2186,12 @@ def vm_worker_create_snapshot(
)
if not ret:
cleanup_failure()
fail(
celery,
msg.replace("ERROR: ", ""),
)
return False
message = (msg.replace("ERROR: ", ""),)
fail(celery, message)
if return_status:
return False, message
else:
return False
else:
snap_list.append(f"{pool}/{volume}@{snapshot_name}")
@ -2242,12 +2251,22 @@ def vm_worker_create_snapshot(
)
current_stage += 1
return finish(
celery,
f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",
current=current_stage,
total=total_stages,
)
message = (f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",)
if return_status:
finish(
celery,
message,
current=current_stage,
total=total_stages,
)
return True, message
else:
return finish(
celery,
message,
current=current_stage,
total=total_stages,
)
def vm_worker_remove_snapshot(
@ -3157,6 +3176,7 @@ def vm_worker_send_snapshot(
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
return_status=False,
):
current_stage = 0
@ -3171,11 +3191,12 @@ def vm_worker_send_snapshot(
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
message = (f"Could not find VM '{domain}' in the cluster",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Get our side's VM configuration details
try:
@ -3184,31 +3205,34 @@ def vm_worker_send_snapshot(
vm_detail = None
if not isinstance(vm_detail, dict):
fail(
celery,
f"VM listing returned invalid data: {vm_detail}",
)
return False
message = (f"VM listing returned invalid data: {vm_detail}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if the snapshot exists
if not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if the incremental parent exists
if incremental_parent is not None and not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",)
fail(celery, message)
if return_status:
return False, message
else:
return False
vm_name = vm_detail["name"]
@ -3234,23 +3258,26 @@ def vm_worker_send_snapshot(
if "PVC API" not in response.json().get("message"):
raise ValueError("Remote API is not a PVC API or incorrect URI given")
except requests.exceptions.ConnectionError as e:
fail(
celery,
f"Connection to remote API timed out: {e}",
)
return False
message = (f"Connection to remote API timed out: {e}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
except ValueError as e:
fail(
celery,
f"Connection to remote API is not valid: {e}",
)
return False
message = (f"Connection to remote API is not valid: {e}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
except Exception as e:
fail(
celery,
f"Connection to remote API failed: {e}",
)
return False
message = (f"Connection to remote API failed: {e}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Hit the API "/status" endpoint to validate API key and cluster status
response = session.get(
@ -3263,11 +3290,14 @@ def vm_worker_send_snapshot(
"pvc_version", None
)
if current_destination_pvc_version is None:
fail(
celery,
message = (
"Connection to remote API failed: no PVC version information returned",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
expected_destination_pvc_version = "0.9.101"
# Work around development versions
@ -3278,11 +3308,14 @@ def vm_worker_send_snapshot(
if parse_version(current_destination_pvc_version) < parse_version(
expected_destination_pvc_version
):
fail(
celery,
message = (
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if the VM already exists on the remote
response = session.get(
@ -3301,11 +3334,12 @@ def vm_worker_send_snapshot(
current_destination_vm_state is not None
and current_destination_vm_state != "mirror"
):
fail(
celery,
"Remote PVC VM exists and is not a mirror",
)
return False
message = ("Remote PVC VM exists and is not a mirror",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Get details about VM snapshot
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
@ -3351,31 +3385,38 @@ def vm_worker_send_snapshot(
# Check if this snapshot is in the remote list already
if snapshot_name in [s["name"] for s in destination_vm_snapshots]:
fail(
celery,
f"Snapshot {snapshot_name} already exists on the target",
)
return False
message = (f"Snapshot {snapshot_name} already exists on the target",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if this snapshot is older than the latest remote VM snapshot
if (
len(destination_vm_snapshots) > 0
and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"]
):
fail(
celery,
message = (
f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check that our incremental parent exists on the remote VM
if incremental_parent is not None:
if incremental_parent not in [s["name"] for s in destination_vm_snapshots]:
fail(
celery,
message = (
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Begin send, set stages
total_stages += 1 + (3 * len(snapshot_rbdsnaps))
@ -3393,6 +3434,25 @@ def vm_worker_send_snapshot(
"source_snapshot": incremental_parent,
}
# Strip out autobackup and automirror tags
# These should never be wanted on the receiving side
from daemon_lib.config import (
get_autobackup_configuration,
get_automirror_configuration,
)
autobackup_config = get_autobackup_configuration()
automirror_config = get_automirror_configuration()
new_tags = list()
for tag in vm_detail["tags"]:
tag_base = tag["name"].split(":")[0]
if tag_base in [t for t in autobackup_config["backup_tags"]] or tag_base in [
t for t in automirror_config["mirror_tags"]
]:
continue
new_tags.append(tag)
vm_detail["tags"] = new_tags
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
@ -3400,11 +3460,12 @@ def vm_worker_send_snapshot(
json=vm_detail,
)
if response.status_code != 200:
fail(
celery,
f"Failed to send config: {response.json()['message']}",
)
return False
message = (f"Failed to send config: {response.json()['message']}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Create the block devices on the remote side if this is a new VM send
block_t_start = time.time()
@ -3431,11 +3492,12 @@ def vm_worker_send_snapshot(
error_message = f"Multiple details returned for volume {rbd_name}"
else:
error_message = f"Error getting details for volume {rbd_name}"
fail(
celery,
error_message,
)
return False
message = (error_message,)
fail(celery, message)
if return_status:
return False, message
else:
return False
try:
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
@ -3460,11 +3522,12 @@ def vm_worker_send_snapshot(
data=None,
)
if response.status_code != 404 and current_destination_vm_state is None:
fail(
celery,
f"Remote storage pool {pool} already contains volume {volume}",
)
return False
message = (f"Remote storage pool {pool} already contains volume {volume}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
if current_destination_vm_state is not None:
try:
@ -3474,7 +3537,10 @@ def vm_worker_send_snapshot(
except Exception as e:
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
fail(celery, error_message)
return False
if return_status:
return False, error_message
else:
return False
if local_volume_size != remote_volume_size:
response = session.put(
@ -3482,11 +3548,12 @@ def vm_worker_send_snapshot(
params={"new_size": local_volume_size, "force": True},
)
if response.status_code != 200:
fail(
celery,
"Failed to resize remote volume to match local volume",
)
return False
message = ("Failed to resize remote volume to match local volume",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Send the volume to the remote
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
@ -3557,11 +3624,14 @@ def vm_worker_send_snapshot(
stream=True,
)
if response.status_code != 200:
fail(
celery,
message = (
f"Failed to send diff batch: {response.json()['message']}",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
current_chunk_time = time.time()
chunk_time = current_chunk_time - last_chunk_time
@ -3609,11 +3679,12 @@ def vm_worker_send_snapshot(
buffer.clear() # Clear the buffer after sending
buffer_size = 0 # Reset buffer size
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
message = (f"Failed to send snapshot: {response.json()['message']}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
finally:
image.close()
ioctx.close()
@ -3657,11 +3728,14 @@ def vm_worker_send_snapshot(
data=full_chunker(),
)
if response.status_code != 200:
fail(
celery,
message = (
f"Failed to send snapshot: {response.json()['message']}",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
finally:
image.close()
ioctx.close()
@ -3678,11 +3752,12 @@ def vm_worker_send_snapshot(
params=send_params,
)
if response.status_code != 200:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
message = (f"Failed to send snapshot: {response.json()['message']}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
finally:
image.close()
ioctx.close()
@ -3692,12 +3767,24 @@ def vm_worker_send_snapshot(
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
current_stage += 1
return finish(
celery,
message = (
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
current=current_stage,
total=total_stages,
)
if return_status:
finish(
celery,
message,
current=current_stage,
total=total_stages,
)
return True, message
else:
return finish(
celery,
message,
current=current_stage,
total=total_stages,
)
def vm_worker_create_mirror(