Port VM autobackups into pvcworkerd with snaps
Moves VM autobackups from being in-CLI to being handled by the pvcworkerd system on the primary coordinator. Turns the CLI autobackup command into an actual API client endpoint rather than having its logic in the CLI. In addition, modifies the new autobackup to leverage the new "pvc vm snapshot" function set, just with special snapshot names. This helps automate this within the new snapshot scaffolding.
This commit is contained in:
479
daemon-common/autobackup.py
Normal file
479
daemon-common/autobackup.py
Normal file
@ -0,0 +1,479 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# autobackup.py - PVC API Autobackup functions
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2024 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
from datetime import datetime
|
||||
from json import load as jload
|
||||
from json import dump as jdump
|
||||
from os import popen, makedirs, path
|
||||
from shutil import rmtree
|
||||
from subprocess import run, PIPE
|
||||
|
||||
from daemon_lib.config import get_autobackup_configuration
|
||||
from daemon_lib.celery import start, fail, log_info, log_err, update, finish
|
||||
|
||||
import daemon_lib.ceph as pvc_ceph
|
||||
import daemon_lib.vm as pvc_vm
|
||||
|
||||
|
||||
def send_execution_failure_report(
|
||||
celery_conf, config, recipients=None, total_time=0, error=None
|
||||
):
|
||||
if recipients is None:
|
||||
return
|
||||
|
||||
from email.utils import formatdate
|
||||
from socket import gethostname
|
||||
|
||||
log_message = f"Sending email failure report to {', '.join(recipients)}"
|
||||
log_info(log_message)
|
||||
update(
|
||||
celery_conf[0],
|
||||
log_message,
|
||||
current=celery_conf[1] + 1,
|
||||
total=celery_conf[2],
|
||||
)
|
||||
|
||||
current_datetime = datetime.now()
|
||||
email_datetime = formatdate(float(current_datetime.strftime("%s")))
|
||||
|
||||
email = list()
|
||||
email.append(f"Date: {email_datetime}")
|
||||
email.append(
|
||||
f"Subject: PVC Autobackup execution failure for cluster '{config['cluster']}'"
|
||||
)
|
||||
|
||||
email_to = list()
|
||||
for recipient in recipients:
|
||||
email_to.append(f"<{recipient}>")
|
||||
|
||||
email.append(f"To: {', '.join(email_to)}")
|
||||
email.append(f"From: PVC Autobackup System <pvc@{gethostname()}>")
|
||||
email.append("")
|
||||
|
||||
email.append(
|
||||
f"A PVC autobackup has FAILED at {current_datetime} in {total_time}s due to an execution error."
|
||||
)
|
||||
email.append("")
|
||||
email.append("The reported error message is:")
|
||||
email.append(f" {error}")
|
||||
|
||||
try:
|
||||
with popen("/usr/sbin/sendmail -t", "w") as p:
|
||||
p.write("\n".join(email))
|
||||
except Exception as e:
|
||||
log_err(f"Failed to send report email: {e}")
|
||||
|
||||
|
||||
def send_execution_summary_report(
|
||||
celery_conf, config, recipients=None, total_time=0, summary=dict()
|
||||
):
|
||||
if recipients is None:
|
||||
return
|
||||
|
||||
from email.utils import formatdate
|
||||
from socket import gethostname
|
||||
|
||||
log_message = f"Sending email summary report to {', '.join(recipients)}"
|
||||
log_info(log_message)
|
||||
update(
|
||||
celery_conf[0],
|
||||
log_message,
|
||||
current=celery_conf[1] + 1,
|
||||
total=celery_conf[2],
|
||||
)
|
||||
|
||||
current_datetime = datetime.now()
|
||||
email_datetime = formatdate(float(current_datetime.strftime("%s")))
|
||||
|
||||
email = list()
|
||||
email.append(f"Date: {email_datetime}")
|
||||
email.append(f"Subject: PVC Autobackup report for cluster '{config['cluster']}'")
|
||||
|
||||
email_to = list()
|
||||
for recipient in recipients:
|
||||
email_to.append(f"<{recipient}>")
|
||||
|
||||
email.append(f"To: {', '.join(email_to)}")
|
||||
email.append(f"From: PVC Autobackup System <pvc@{gethostname()}>")
|
||||
email.append("")
|
||||
|
||||
email.append(
|
||||
f"A PVC autobackup has been completed at {current_datetime} in {total_time}s."
|
||||
)
|
||||
email.append("")
|
||||
email.append(
|
||||
"The following is a summary of all current VM backups after cleanups, most recent first:"
|
||||
)
|
||||
email.append("")
|
||||
|
||||
for vm in summary.keys():
|
||||
email.append(f"VM: {vm}:")
|
||||
for backup in summary[vm]:
|
||||
datestring = backup.get("datestring")
|
||||
backup_date = datetime.strptime(datestring, "%Y%m%d%H%M%S")
|
||||
if backup.get("result", False):
|
||||
email.append(
|
||||
f" {backup_date}: Success in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}"
|
||||
)
|
||||
email.append(
|
||||
f" Backup contains {len(backup.get('backup_files'))} files totaling {pvc_ceph.format_bytes_tohuman(backup.get('backup_size_bytes', 0))} ({backup.get('backup_size_bytes', 0)} bytes)"
|
||||
)
|
||||
else:
|
||||
email.append(
|
||||
f" {backup_date}: Failure in {backup.get('runtime_secs', 0)} seconds, ID {datestring}, type {backup.get('type', 'unknown')}"
|
||||
)
|
||||
email.append(f" {backup.get('result_message')}")
|
||||
|
||||
try:
|
||||
with popen("/usr/sbin/sendmail -t", "w") as p:
|
||||
p.write("\n".join(email))
|
||||
except Exception as e:
|
||||
log_err(f"Failed to send report email: {e}")
|
||||
|
||||
|
||||
def worker_cluster_autobackup(
|
||||
zkhandler, celery, force_full=False, email_recipients=None
|
||||
):
|
||||
config = get_autobackup_configuration()
|
||||
|
||||
backup_summary = dict()
|
||||
|
||||
current_stage = 0
|
||||
total_stages = 1
|
||||
if email_recipients is not None:
|
||||
total_stages += 1
|
||||
|
||||
start(
|
||||
celery,
|
||||
f"Starting cluster '{config['cluster']}' VM autobackup",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
if not config["autobackup_enabled"]:
|
||||
message = "Autobackups are not configured on this cluster."
|
||||
log_info(celery, message)
|
||||
return finish(
|
||||
celery,
|
||||
message,
|
||||
current=total_stages,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
autobackup_start_time = datetime.now()
|
||||
|
||||
retcode, vm_list = pvc_vm.get_list(zkhandler)
|
||||
if not retcode:
|
||||
error_message = f"Failed to fetch VM list: {vm_list}"
|
||||
log_err(celery, error_message)
|
||||
send_execution_failure_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
error=error_message,
|
||||
)
|
||||
fail(celery, error_message)
|
||||
return False
|
||||
|
||||
backup_vms = list()
|
||||
for vm in vm_list:
|
||||
vm_tag_names = [t["name"] for t in vm["tags"]]
|
||||
matching_tags = (
|
||||
True
|
||||
if len(set(vm_tag_names).intersection(set(config["backup_tags"]))) > 0
|
||||
else False
|
||||
)
|
||||
if matching_tags:
|
||||
backup_vms.append(vm)
|
||||
|
||||
if len(backup_vms) < 1:
|
||||
message = "Found no VMs tagged for autobackup."
|
||||
log_info(celery, message)
|
||||
return finish(
|
||||
celery,
|
||||
message,
|
||||
current=total_stages,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
if config["auto_mount_enabled"]:
|
||||
total_stages += len(config["mount_cmds"])
|
||||
total_stages += len(config["unmount_cmds"])
|
||||
for vm in backup_vms:
|
||||
total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"])
|
||||
total_stages += 2 + 1 + 2 + 2 + 3 * total_disks
|
||||
|
||||
log_info(
|
||||
celery,
|
||||
f"Found {len(backup_vms)} suitable VM(s) for autobackup: {', '.join(vm_list)}",
|
||||
)
|
||||
|
||||
# Handle automount mount commands
|
||||
if config["auto_mount_enabled"]:
|
||||
for cmd in config["mount_cmds"]:
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Executing mount command '{cmd.split()[0]}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
ret = run(
|
||||
cmd.split(),
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
)
|
||||
|
||||
if ret.returncode != 0:
|
||||
error_message = f"Failed to execute mount command '{cmd.split()[0]}': {ret.stderr.decode().strip()}"
|
||||
log_err(celery, error_message)
|
||||
send_execution_failure_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
total_time=datetime.now() - autobackup_start_time,
|
||||
error=error_message,
|
||||
)
|
||||
fail(celery, error_message)
|
||||
return False
|
||||
|
||||
# Execute the backup: take a snapshot, then export the snapshot
|
||||
backup_suffixed_path = (
|
||||
f"{config['backup_root_path']}/{config['backup_root_suffix']}"
|
||||
)
|
||||
if not path.exists(backup_suffixed_path):
|
||||
makedirs(backup_suffixed_path)
|
||||
|
||||
full_interval = config["backup_schedule"]["full_interval"]
|
||||
full_retention = config["backup_schedule"]["full_retention"]
|
||||
|
||||
for vm in backup_vms:
|
||||
vm_name = vm["name"]
|
||||
vm_backup_path = f"{backup_suffixed_path}/{vm_name}"
|
||||
autobackup_state_file = f"{vm_backup_path}/.autobackup.json"
|
||||
if not path.exists(vm_backup_path) or not path.exists(autobackup_state_file):
|
||||
# There are no existing backups so the list is empty
|
||||
state_data = dict()
|
||||
tracked_backups = list()
|
||||
else:
|
||||
with open(autobackup_state_file) as fh:
|
||||
state_data = jload(fh)
|
||||
tracked_backups = state_data["tracked_backups"]
|
||||
|
||||
full_backups = [b for b in tracked_backups if b["type"] == "full"]
|
||||
if len(full_backups) > 0:
|
||||
last_full_backup = full_backups[0]
|
||||
last_full_backup_idx = tracked_backups.index(last_full_backup)
|
||||
if force_full:
|
||||
this_backup_incremental_parent = None
|
||||
this_backup_retain_snapshot = True
|
||||
elif last_full_backup_idx >= full_interval - 1:
|
||||
this_backup_incremental_parent = None
|
||||
this_backup_retain_snapshot = True
|
||||
else:
|
||||
this_backup_incremental_parent = last_full_backup["datestring"]
|
||||
this_backup_retain_snapshot = False
|
||||
else:
|
||||
# The very first ackup must be full to start the tree
|
||||
this_backup_incremental_parent = None
|
||||
this_backup_retain_snapshot = True
|
||||
|
||||
now = datetime.now()
|
||||
datestring = now.strftime("%Y%m%d%H%M%S")
|
||||
snapshot_name = f"autobackup_{datestring}"
|
||||
|
||||
# Take the snapshot
|
||||
ret = pvc_vm.vm_worker_create_snapshot(
|
||||
zkhandler,
|
||||
celery,
|
||||
vm_name,
|
||||
snapshot_name=snapshot_name,
|
||||
override_current_stage=current_stage,
|
||||
override_total_stages=total_stages,
|
||||
)
|
||||
if ret is False:
|
||||
error_message = f"Failed to create backup snapshot '{snapshot_name}'"
|
||||
log_err(celery, error_message)
|
||||
send_execution_failure_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
error=error_message,
|
||||
)
|
||||
return False
|
||||
|
||||
# Export the snapshot
|
||||
ret = pvc_vm.vm_worker_export_snapshot(
|
||||
zkhandler,
|
||||
celery,
|
||||
vm_name,
|
||||
snapshot_name,
|
||||
backup_suffixed_path,
|
||||
incremental_parent=this_backup_incremental_parent,
|
||||
override_current_stage=current_stage,
|
||||
override_total_stages=total_stages,
|
||||
)
|
||||
if ret is False:
|
||||
error_message = f"Failed to export backup snapshot '{snapshot_name}'"
|
||||
log_err(celery, error_message)
|
||||
send_execution_failure_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
error=error_message,
|
||||
)
|
||||
return False
|
||||
|
||||
# Clean up the snapshot
|
||||
if not this_backup_retain_snapshot:
|
||||
ret = pvc_vm.vm_worker_remove_snapshot(
|
||||
zkhandler,
|
||||
celery,
|
||||
vm_name,
|
||||
snapshot_name,
|
||||
override_current_stage=current_stage,
|
||||
override_total_stages=total_stages,
|
||||
)
|
||||
if ret is False:
|
||||
error_message = f"Failed to remove backup snapshot '{snapshot_name}'"
|
||||
log_err(celery, error_message)
|
||||
send_execution_failure_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
error=error_message,
|
||||
)
|
||||
return False
|
||||
else:
|
||||
total_disks = len([d for d in vm["disks"] if d["type"] == "rbd"])
|
||||
current_stage += 2 + total_disks
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Finding obsolete incremental backups for '{vm_name}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Read export file to get details
|
||||
backup_json_file = f"{vm_backup_path}/{snapshot_name}/snapshot.json"
|
||||
with open(backup_json_file) as fh:
|
||||
backup_json = jload(fh)
|
||||
tracked_backups.insert(0, backup_json)
|
||||
|
||||
marked_for_deletion = list()
|
||||
# Find any full backups that are expired
|
||||
found_full_count = 0
|
||||
for backup in tracked_backups:
|
||||
if backup["type"] == "full":
|
||||
found_full_count += 1
|
||||
if found_full_count > full_retention:
|
||||
marked_for_deletion.append(backup)
|
||||
# Find any incremental backups that depend on marked parents
|
||||
for backup in tracked_backups:
|
||||
if backup["type"] == "incremental" and backup["incremental_parent"] in [
|
||||
b["datestring"] for b in marked_for_deletion
|
||||
]:
|
||||
marked_for_deletion.append(backup)
|
||||
|
||||
current_stage += 1
|
||||
if len(marked_for_deletion) > 0:
|
||||
update(
|
||||
celery,
|
||||
f"Cleaning up aged out backups for '{vm_name}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
for backup_to_delete in marked_for_deletion:
|
||||
ret = pvc_vm.vm_worker_remove_snapshot(
|
||||
zkhandler, None, vm_name, backup_to_delete["snapshot_name"]
|
||||
)
|
||||
if ret is False:
|
||||
error_message = f"Failed to remove obsolete backup snapshot '{backup_to_delete['snapshot_name']}', leaving in tracked backups"
|
||||
log_err(celery, error_message)
|
||||
else:
|
||||
rmtree(f"{vm_backup_path}/{backup_to_delete['snapshot_name']}")
|
||||
tracked_backups.remove(backup_to_delete)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Updating tracked backups",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
state_data["tracked_backups"] = tracked_backups
|
||||
with open(autobackup_state_file, "w") as fh:
|
||||
jdump(state_data, fh)
|
||||
|
||||
backup_summary[vm] = tracked_backups
|
||||
|
||||
# Handle automount unmount commands
|
||||
if config["auto_mount_enabled"]:
|
||||
for cmd in config["unmount_cmds"]:
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Executing unmount command '{cmd.split()[0]}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
ret = run(
|
||||
cmd.split(),
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
)
|
||||
|
||||
if ret.returncode != 0:
|
||||
error_message = f"Failed to execute unmount command '{cmd.split()[0]}': {ret.stderr.decode().strip()}"
|
||||
log_err(celery, error_message)
|
||||
send_execution_failure_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
total_time=datetime.now() - autobackup_start_time,
|
||||
error=error_message,
|
||||
)
|
||||
fail(celery, error_message)
|
||||
return False
|
||||
|
||||
autobackup_end_time = datetime.now()
|
||||
autobackup_total_time = autobackup_end_time - autobackup_start_time
|
||||
|
||||
send_execution_summary_report(
|
||||
(celery, current_stage, total_stages),
|
||||
config,
|
||||
recipients=email_recipients,
|
||||
total_time=autobackup_total_time,
|
||||
summary=backup_summary,
|
||||
)
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
f"Successfully completed cluster '{config['cluster']}' VM autobackup",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
@ -406,6 +406,78 @@ def get_configuration():
|
||||
return config
|
||||
|
||||
|
||||
def get_parsed_autobackup_configuration(config_file):
|
||||
"""
|
||||
Load the configuration; this is the same main pvc.conf that the daemons read
|
||||
"""
|
||||
print('Loading configuration from file "{}"'.format(config_file))
|
||||
|
||||
with open(config_file, "r") as cfgfh:
|
||||
try:
|
||||
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to parse configuration file: {e}")
|
||||
os._exit(1)
|
||||
|
||||
config = dict()
|
||||
|
||||
try:
|
||||
o_cluster = o_config["cluster"]
|
||||
config_cluster = {
|
||||
"cluster": o_cluster["name"],
|
||||
"autobackup_enabled": True,
|
||||
}
|
||||
config = {**config, **config_cluster}
|
||||
|
||||
o_autobackup = o_config["autobackup"]
|
||||
if o_autobackup is None:
|
||||
config["autobackup_enabled"] = False
|
||||
return config
|
||||
|
||||
config_autobackup = {
|
||||
"backup_root_path": o_autobackup["backup_root_path"],
|
||||
"backup_root_suffix": o_autobackup["backup_root_suffix"],
|
||||
"backup_tags": o_autobackup["backup_tags"],
|
||||
"backup_schedule": o_autobackup["backup_schedule"],
|
||||
}
|
||||
config = {**config, **config_autobackup}
|
||||
|
||||
o_automount = o_autobackup["auto_mount"]
|
||||
config_automount = {
|
||||
"auto_mount_enabled": o_automount["enabled"],
|
||||
}
|
||||
config = {**config, **config_automount}
|
||||
if config["auto_mount_enabled"]:
|
||||
config["mount_cmds"] = list()
|
||||
for _mount_cmd in o_automount["mount_cmds"]:
|
||||
if "{backup_root_path}" in _mount_cmd:
|
||||
_mount_cmd = _mount_cmd.format(
|
||||
backup_root_path=config["backup_root_path"]
|
||||
)
|
||||
config["mount_cmds"].append(_mount_cmd)
|
||||
config["unmount_cmds"] = list()
|
||||
for _unmount_cmd in o_automount["unmount_cmds"]:
|
||||
if "{backup_root_path}" in _unmount_cmd:
|
||||
_unmount_cmd = _unmount_cmd.format(
|
||||
backup_root_path=config["backup_root_path"]
|
||||
)
|
||||
config["unmount_cmds"].append(_unmount_cmd)
|
||||
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_autobackup_configuration():
|
||||
"""
|
||||
Get the configuration.
|
||||
"""
|
||||
pvc_config_file = get_configuration_path()
|
||||
config = get_parsed_autobackup_configuration(pvc_config_file)
|
||||
return config
|
||||
|
||||
|
||||
def validate_directories(config):
|
||||
if not os.path.exists(config["dynamic_directory"]):
|
||||
os.makedirs(config["dynamic_directory"])
|
||||
|
Reference in New Issue
Block a user