Compare commits

...

2 Commits

Author SHA1 Message Date
Joshua Boniface b525bbe81d Improve celery logging by including calling def 2024-11-15 11:49:32 -05:00
Joshua Boniface 078d48a50b Add VM automirror support
Allows shipping snapshots automatically to remote clusters on a cron,
identically to how autobackup handles local snapshot exports.

VMs are selected based on configured tags, and individual destination
clusters can be specified based on a colon-separated suffix to the
tag(s).

Automirror snapshots use the prefix "am" (analogous to "ab" for
autobackups) to differentiate them from normal "mr" mirrors.
2024-11-15 11:34:39 -05:00
9 changed files with 1026 additions and 128 deletions

View File

@ -4533,6 +4533,76 @@ class API_VM_Autobackup_Root(Resource):
api.add_resource(API_VM_Autobackup_Root, "/vm/autobackup")
# /vm/automirror
class API_VM_Automirror_Root(Resource):
@RequestParser(
[
{"name": "email_recipients"},
{
"name": "email_errors_only",
"required": False,
},
]
)
@Authenticator
def post(self, reqargs):
"""
Trigger a cluster automirror job
---
tags:
- provisioner
parameters:
- in: query
name: email_recipients
type: string
required: false
description: A list of email addresses to send failure and report emails to, comma-separated
- in: query
name: email_errors_only
type: boolean
required: false
default: false
description: If set and true, only sends a report email to email_recipients when there is an error with at least one mirror
responses:
202:
description: Accepted
schema:
type: object
description: The Celery job information of the task
id: CeleryTask
400:
description: Bad request
schema:
type: object
id: Message
"""
email_recipients = reqargs.get("email_recipients", None)
if email_recipients is not None and not isinstance(email_recipients, list):
email_recipients = [email_recipients]
email_errors_only = bool(strtobool(reqargs.get("email_errors_only", "False")))
task = run_celery_task(
"cluster.automirror",
email_recipients=email_recipients,
email_errors_only=email_errors_only,
run_on="primary",
)
return (
{
"task_id": task.id,
"task_name": "cluster.automirror",
"run_on": f"{get_primary_node()} (primary)",
},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_VM_Automirror_Root, "/vm/automirror")
##########################################################
# Client API - Network
##########################################################

View File

@ -2577,6 +2577,89 @@ def cli_vm_autobackup(email_report, force_full_flag, wait_flag, cron_flag):
finish(retcode, retmsg)
###############################################################################
# > pvc vm automirror
###############################################################################
@click.command(
name="automirror", short_help="Perform automatic virtual machine mirrors."
)
@connection_req
@click.option(
"--email-report",
"email_report",
default=None,
help="Email a mirror summary report to the specified address(es), comma-separated.",
)
@click.option(
"--email-errors-only",
"email_errors_only_flag",
is_flag=True,
default=False,
show_default=True,
help="Only send a mirror summary report when at least one error occurrs.",
)
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress if waiting.",
)
@click.option(
"--cron",
"cron_flag",
is_flag=True,
default=False,
show_default=True,
help="Run in cron mode (returns immediately with no output once job is submitted).",
)
def cli_vm_automirror(email_report, email_errors_only_flag, wait_flag, cron_flag):
"""
Perform automated mirrors of VMs, with integrated cleanup and full/incremental scheduling.
This command enables automatic mirrors of PVC VMs at the block level, leveraging the various "pvc vm snapshot"
functions with an internal rentention and cleanup system. VMs and the destination cluster(s) are selected based
on configured VM tags and a set of static configs in the cluster's `pvc.conf` configuration.
This command should be run from cron or a timer at a regular interval (e.g. daily, hourly, etc.) which defines
how often mirrors are taken. Mirror retention is based only on the number of recorded mirrors on the remote side,
not on the time interval between them. Mirrors taken manually outside of the "automirror" command are not counted
towards the format or retention of automirrors.
WARNING: Running this command manually will interfere with the schedule! Do not run manually except for testing.
The actual details of the automirror, including retention policies, are defined in the main PVC configuration file
`/etc/pvc/pvc.conf`. See the sample configuration for more details.
An optional report on all current mirrors can be emailed to one or more email addresses using the
"--email-report" flag. This report will include information on all current known mirrors.
"""
if cron_flag:
wait_flag = False
if email_report is not None:
email_recipients = email_report.split(",")
else:
email_recipients = None
retcode, retmsg = pvc.lib.vm.vm_automirror(
CLI_CONFIG,
email_recipients=email_recipients,
email_errors_only_flag=email_errors_only_flag,
wait_flag=wait_flag,
)
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
if cron_flag:
finish(retcode, None)
else:
finish(retcode, retmsg)
###############################################################################
# > pvc vm tag
###############################################################################
@ -6918,6 +7001,7 @@ cli_vm_backup.add_command(cli_vm_backup_restore)
cli_vm_backup.add_command(cli_vm_backup_remove)
cli_vm.add_command(cli_vm_backup)
cli_vm.add_command(cli_vm_autobackup)
cli_vm.add_command(cli_vm_automirror)
cli_vm_tag.add_command(cli_vm_tag_get)
cli_vm_tag.add_command(cli_vm_tag_add)
cli_vm_tag.add_command(cli_vm_tag_remove)

View File

@ -714,6 +714,26 @@ def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_fla
return get_wait_retdata(response, wait_flag)
def vm_automirror(
config, email_recipients=None, email_errors_only_flag=False, wait_flag=True
):
"""
Perform a cluster VM automirror
API endpoint: POST /vm//automirror
API arguments: email_recipients=email_recipients, email_errors_only=email_errors_only_flag
API schema: {"message":"{data}"}
"""
params = {
"email_recipients": email_recipients,
"email_errors_only": email_errors_only_flag,
}
response = call_api(config, "post", "/vm/automirror", params=params)
return get_wait_retdata(response, wait_flag)
def vm_vcpus_set(config, vm, vcpus, topology, restart):
"""
Set the vCPU count of the VM with topology

496
daemon-common/automirror.py Normal file
View File

@ -0,0 +1,496 @@
#!/usr/bin/env python3
# automirror.py - PVC API Automirror functions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2024 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import requests
from datetime import datetime
from os import popen
from daemon_lib.config import get_automirror_configuration
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
import daemon_lib.vm as vm
def send_execution_failure_report(
celery, config, recipients=None, total_time=0, error=None
):
if recipients is None:
return
from email.utils import formatdate
from socket import gethostname
log_message = f"Sending email failure report to {', '.join(recipients)}"
log_info(celery, log_message)
current_datetime = datetime.now()
email_datetime = formatdate(float(current_datetime.strftime("%s")))
email = list()
email.append(f"Date: {email_datetime}")
email.append(
f"Subject: PVC Automirror execution failure for cluster '{config['cluster']}'"
)
email_to = list()
for recipient in recipients:
email_to.append(f"<{recipient}>")
email.append(f"To: {', '.join(email_to)}")
email.append(f"From: PVC Automirror System <pvc@{gethostname()}>")
email.append("")
email.append(
f"A PVC automirror has FAILED at {current_datetime} in {total_time}s due to an execution error."
)
email.append("")
email.append("The reported error message is:")
email.append(f" {error}")
try:
with popen("/usr/sbin/sendmail -t", "w") as p:
p.write("\n".join(email))
except Exception as e:
log_err(f"Failed to send report email: {e}")
def send_execution_summary_report(
celery,
config,
recipients=None,
total_time=0,
summary=dict(),
local_deleted_snapshots=dict(),
):
if recipients is None:
return
from email.utils import formatdate
from socket import gethostname
log_message = f"Sending email summary report to {', '.join(recipients)}"
log_info(celery, log_message)
current_datetime = datetime.now()
email_datetime = formatdate(float(current_datetime.strftime("%s")))
email = list()
email.append(f"Date: {email_datetime}")
email.append(f"Subject: PVC Automirror report for cluster '{config['cluster']}'")
email_to = list()
for recipient in recipients:
email_to.append(f"<{recipient}>")
email.append(f"To: {', '.join(email_to)}")
email.append(f"From: PVC Automirror System <pvc@{gethostname()}>")
email.append("")
email.append(
f"A PVC automirror has been completed at {current_datetime} in {total_time}."
)
email.append("")
email.append(
"The following is a summary of all VM mirror jobs executed during this run:"
)
email.append("")
vm_names = {k.split(":")[0] for k in summary.keys()}
for vm_name in vm_names:
email.append(f"VM {vm_name}:")
email.append(" Mirror jobs:")
for destination_name in {
k.split(":")[1] for k in summary.keys() if k.split(":")[0] == vm_name
}:
mirror = summary[f"{vm_name}:{destination_name}"]
datestring = mirror.get("snapshot_name").replace("am", "")
mirror_date = datetime.strptime(datestring, "%Y%m%d%H%M%S")
if mirror.get("result", False):
email.append(
f" * {mirror_date}: Success to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}"
)
else:
email.append(
f" * {mirror_date}: Failure to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}"
)
email.append(
f" {mirror.get('result_message')}"
)
email.append(
" The following aged-out local snapshots were removed during cleanup:"
)
for snapshot in local_deleted_snapshots[vm_name]:
email.append(f" * {snapshot}")
try:
with popen("/usr/sbin/sendmail -t", "w") as p:
p.write("\n".join(email))
except Exception as e:
log_err(f"Failed to send report email: {e}")
def run_vm_mirror(
zkhandler, celery, config, vm_detail, snapshot_name, destination_name
):
vm_name = vm_detail["name"]
keep_count = config["mirror_keep_snapshots"]
try:
destination = config["mirror_destinations"][destination_name]
except Exception:
error_message = f"Failed to find valid destination cluster '{destination_name}' for VM '{vm_name}'"
log_err(celery, error_message)
return error_message
destination_api_uri = f"{'https' if destination['ssl'] else 'http'}://{destination['address']}:{destination['port']}{destination['prefix']}"
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
"X-Api-Key": destination["key"],
}
session = requests.Session()
session.headers.update(destination_api_headers)
session.verify = destination["verify_ssl"]
session.timeout = destination_api_timeout
# Get the last snapshot that is on the remote side for incrementals
response = session.get(
f"{destination_api_uri}/vm/{vm_name}",
params=None,
data=None,
)
destination_vm_detail = response.json()
if type(destination_vm_detail) is list and len(destination_vm_detail) > 0:
destination_vm_detail = destination_vm_detail[0]
try:
last_snapshot_name = [
s
for s in destination_vm_detail["snapshots"]
if s["name"].startswith("am")
][0]["name"]
except Exception:
last_snapshot_name = None
else:
last_snapshot_name = None
# Send the current snapshot
result, message = vm.vm_worker_send_snapshot(
zkhandler,
None,
vm_name,
snapshot_name,
destination_api_uri,
destination["key"],
destination_api_verify_ssl=destination["verify_ssl"],
incremental_parent=last_snapshot_name,
destination_storage_pool=destination["pool"],
return_status=True,
)
if not result:
return False, message
response = session.get(
f"{destination_api_uri}/vm/{vm_name}",
params=None,
data=None,
)
destination_vm_detail = response.json()
if type(destination_vm_detail) is list and len(destination_vm_detail) > 0:
destination_vm_detail = destination_vm_detail[0]
else:
message = "Remote VM somehow does not exist after successful mirror; skipping snapshot cleanup"
return False, message
# Find any mirror snapshots that are expired
remote_snapshots = [
s for s in destination_vm_detail["snapshots"] if s["name"].startswith("am")
]
# Snapshots are in dated descending order due to the names
if len(remote_snapshots) > keep_count:
remote_marked_for_deletion = [s["name"] for s in remote_snapshots[keep_count:]]
else:
remote_marked_for_deletion = list()
for snapshot in remote_marked_for_deletion:
log_info(
celery,
f"VM {vm_detail['name']} removing stale remote automirror snapshot {snapshot}",
)
session.delete(
f"{destination_api_uri}/vm/{vm_name}/snapshot",
params={
"snapshot_name": snapshot,
},
data=None,
)
session.close()
return True, remote_marked_for_deletion
def worker_cluster_automirror(
zkhandler,
celery,
force_full=False,
email_recipients=None,
email_errors_only=False,
):
config = get_automirror_configuration()
mirror_summary = dict()
local_deleted_snapshots = dict()
current_stage = 0
total_stages = 1
start(
celery,
f"Starting cluster '{config['cluster']}' VM automirror",
current=current_stage,
total=total_stages,
)
if not config["automirror_enabled"]:
message = "Automirrors are not configured on this cluster."
log_info(celery, message)
return finish(
celery,
message,
current=total_stages,
total=total_stages,
)
if email_recipients is not None:
total_stages += 1
automirror_start_time = datetime.now()
retcode, vm_list = vm.get_list(zkhandler)
if not retcode:
error_message = f"Failed to fetch VM list: {vm_list}"
log_err(celery, error_message)
current_stage += 1
send_execution_failure_report(
celery,
config,
recipients=email_recipients,
error=error_message,
)
fail(celery, error_message)
return False
mirror_vms = list()
for vm_detail in vm_list:
mirror_vm = {
"detail": vm_detail,
"destinations": list(),
}
vm_tag_names = [t["name"] for t in vm_detail["tags"]]
# Check if any of the mirror tags are present; if they are, then we should mirror
vm_mirror_tags = list()
for tag in vm_tag_names:
if tag.split(":")[0] in config["mirror_tags"]:
vm_mirror_tags.append(tag)
# There are no mirror tags, so skip this VM
if len(vm_mirror_tags) < 1:
continue
# Go through each tag to extract the cluster
target_clusters = set()
for tag in vm_mirror_tags:
if len(tag.split(":")) == 1:
# This is a direct match without any cluster suffix, so use the default
target_clusters.add(config["mirror_default_destination"])
if len(tag.split(":")) > 1:
# This has a cluster suffix, so use that
target_clusters.add(tag.split(":")[1])
for cluster in target_clusters:
mirror_vm["destinations"].append(cluster)
mirror_vms.append(mirror_vm)
if len(mirror_vms) < 1:
message = "Found no VMs tagged for automirror."
log_info(celery, message)
return finish(
celery,
message,
current=total_stages,
total=total_stages,
)
total_stages += len(mirror_vms)
mirror_vm_names = set([b["detail"]["name"] for b in mirror_vms])
log_info(
celery,
f"Found {len(mirror_vm_names)} suitable VM(s) for automirror: {', '.join(mirror_vm_names)}",
)
# Execute the backup: take a snapshot, then export the snapshot
for mirror_vm in mirror_vms:
vm_detail = mirror_vm["detail"]
vm_destinations = mirror_vm["destinations"]
current_stage += 1
update(
celery,
f"Performing automirror of VM {vm_detail['name']}",
current=current_stage,
total=total_stages,
)
# Automirrors use a custom name to allow them to be properly cleaned up later
now = datetime.now()
datestring = now.strftime("%Y%m%d%H%M%S")
snapshot_name = f"am{datestring}"
result, message = vm.vm_worker_create_snapshot(
zkhandler,
None,
vm_detail["name"],
snapshot_name=snapshot_name,
return_status=True,
)
if not result:
for destination in vm_destinations:
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
"result": result,
"snapshot_name": snapshot_name,
"runtime_secs": 0,
"result_message": message,
}
continue
remote_marked_for_deletion = dict()
for destination in vm_destinations:
mirror_start = datetime.now()
result, ret = run_vm_mirror(
zkhandler,
celery,
config,
vm_detail,
snapshot_name,
destination,
)
mirror_end = datetime.now()
runtime_secs = (mirror_end - mirror_start).seconds
if result:
remote_marked_for_deletion[destination] = ret
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
"result": result,
"snapshot_name": snapshot_name,
"runtime_secs": runtime_secs,
}
else:
log_warn(
celery,
f"Error in mirror send: {ret}",
)
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
"result": result,
"snapshot_name": snapshot_name,
"runtime_secs": runtime_secs,
"result_message": ret,
}
# Find all local snapshots that were present in all remote snapshot deletions,
# then remove them
# If one of the sends fails, this should result in nothing being removed
if remote_marked_for_deletion:
all_lists = [set(lst) for lst in remote_marked_for_deletion.values() if lst]
if all_lists:
local_marked_for_deletion = set.intersection(*all_lists)
else:
local_marked_for_deletion = set()
else:
local_marked_for_deletion = set()
for snapshot in local_marked_for_deletion:
log_info(
celery,
f"VM {vm_detail['name']} removing stale local automirror snapshot {snapshot}",
)
vm.vm_worker_remove_snapshot(
zkhandler,
None,
vm_detail["name"],
snapshot,
)
local_deleted_snapshots[vm_detail["name"]] = local_marked_for_deletion
automirror_end_time = datetime.now()
automirror_total_time = automirror_end_time - automirror_start_time
if email_recipients is not None:
current_stage += 1
if email_errors_only and not all(
[s["result"] for _, s in mirror_summary.items()]
):
# Send report if we're in errors only and at least one send failed
send_report = True
elif not email_errors_only:
# Send report if we're not in errors only
send_report = True
else:
# Otherwise (errors only and all successful) don't send
send_report = False
if send_report:
update(
celery,
"Sending automirror results summary email",
current=current_stage,
total=total_stages,
)
send_execution_summary_report(
celery,
config,
recipients=email_recipients,
total_time=automirror_total_time,
summary=mirror_summary,
local_deleted_snapshots=local_deleted_snapshots,
)
else:
update(
celery,
"Skipping automirror results summary email (no failures)",
current=current_stage,
total=total_stages,
)
current_stage += 1
return finish(
celery,
f"Successfully completed cluster '{config['cluster']}' VM automirror",
current=current_stage,
total=total_stages,
)

View File

@ -22,6 +22,7 @@
import sys
from inspect import stack
from logging import getLogger
from time import sleep
@ -32,7 +33,8 @@ class TaskFailure(Exception):
def start(celery, msg, current=0, total=1):
logger = getLogger(__name__)
logger.info(f"Starting {current}/{total}: {msg}")
caller_name = stack()[1].function
logger.info(f"Start {caller_name} {current}/{total}: {msg}")
if celery is None:
return
celery.update_state(
@ -42,13 +44,14 @@ def start(celery, msg, current=0, total=1):
def fail(celery, msg, exception=None, current=1, total=1):
caller_name = stack()[1].function
if exception is None:
exception = TaskFailure
msg = f"{type(exception()).__name__}: {msg}"
logger = getLogger(__name__)
logger.error(msg)
logger.error(f"Fail {caller_name} {current}/{total}: {msg}")
sys.tracebacklimit = 0
raise exception(msg)
@ -56,22 +59,26 @@ def fail(celery, msg, exception=None, current=1, total=1):
def log_info(celery, msg):
logger = getLogger(__name__)
logger.info(f"Task log: {msg}")
caller_name = stack()[1].function
logger.info(f"Log {caller_name}: {msg}")
def log_warn(celery, msg):
logger = getLogger(__name__)
logger.warning(f"Task log: {msg}")
caller_name = stack()[1].function
logger.warning(f"Log {caller_name}: {msg}")
def log_err(celery, msg):
logger = getLogger(__name__)
logger.error(f"Task log: {msg}")
caller_name = stack()[1].function
logger.error(f"Log {caller_name}: {msg}")
def update(celery, msg, current=1, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: {msg}")
caller_name = stack()[1].function
logger.info(f"Update {caller_name} {current}/{total}: {msg}")
if celery is None:
return
celery.update_state(
@ -82,7 +89,8 @@ def update(celery, msg, current=1, total=2):
def finish(celery, msg, current=2, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: Finishing up")
caller_name = stack()[1].function
logger.info(f"Update {caller_name} {current}/{total}: Finishing up")
if celery is None:
return
celery.update_state(
@ -90,5 +98,5 @@ def finish(celery, msg, current=2, total=2):
meta={"current": current, "total": total, "status": "Finishing up"},
)
sleep(1)
logger.info(f"Success {current}/{total}: {msg}")
logger.info(f"Success {caller_name} {current}/{total}: {msg}")
return {"status": msg, "current": current, "total": total}

View File

@ -481,6 +481,64 @@ def get_autobackup_configuration():
return config
def get_parsed_automirror_configuration(config_file):
"""
Load the configuration; this is the same main pvc.conf that the daemons read
"""
print('Loading configuration from file "{}"'.format(config_file))
with open(config_file, "r") as cfgfh:
try:
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
except Exception as e:
print(f"ERROR: Failed to parse configuration file: {e}")
os._exit(1)
config = dict()
try:
o_cluster = o_config["cluster"]
config_cluster = {
"cluster": o_cluster["name"],
"automirror_enabled": True,
}
config = {**config, **config_cluster}
o_automirror = o_config["automirror"]
if o_automirror is None:
config["automirror_enabled"] = False
return config
config_automirror = {
"mirror_tags": o_automirror["mirror_tags"],
"mirror_destinations": o_automirror["destinations"],
"mirror_default_destination": o_automirror["default_destination"],
"mirror_keep_snapshots": o_automirror["keep_snapshots"],
}
config = {**config, **config_automirror}
if config["mirror_default_destination"] not in [
d for d in config["mirror_destinations"].keys()
]:
raise Exception(
"Specified default mirror destination is not in the list of destinations"
)
except Exception as e:
raise MalformedConfigurationError(e)
return config
def get_automirror_configuration():
"""
Get the configuration.
"""
pvc_config_file = get_configuration_path()
config = get_parsed_automirror_configuration(pvc_config_file)
return config
def validate_directories(config):
if not os.path.exists(config["dynamic_directory"]):
os.makedirs(config["dynamic_directory"])

View File

@ -2107,6 +2107,7 @@ def vm_worker_create_snapshot(
domain,
snapshot_name=None,
zk_only=False,
return_status=False,
):
if snapshot_name is None:
now = datetime.now()
@ -2124,27 +2125,34 @@ def vm_worker_create_snapshot(
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
message = (f"Could not find VM '{domain}' in the cluster",)
fail(celery, message)
if return_status:
return False, message
else:
return False
reg = re.compile("^[a-z0-9.-_]+$")
if not reg.match(snapshot_name):
fail(
celery,
message = (
"Snapshot name '{snapshot_name}' contains invalid characters; only alphanumeric, '.', '-', and '_' characters are allowed",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid))
if current_snapshots and snapshot_name in current_snapshots:
fail(
celery,
message = (
f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Get the list of all RBD volumes
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
@ -2178,11 +2186,12 @@ def vm_worker_create_snapshot(
)
if not ret:
cleanup_failure()
fail(
celery,
msg.replace("ERROR: ", ""),
)
return False
message = (msg.replace("ERROR: ", ""),)
fail(celery, message)
if return_status:
return False, message
else:
return False
else:
snap_list.append(f"{pool}/{volume}@{snapshot_name}")
@ -2242,12 +2251,22 @@ def vm_worker_create_snapshot(
)
current_stage += 1
return finish(
celery,
f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",
current=current_stage,
total=total_stages,
)
message = (f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",)
if return_status:
finish(
celery,
message,
current=current_stage,
total=total_stages,
)
return True, message
else:
return finish(
celery,
message,
current=current_stage,
total=total_stages,
)
def vm_worker_remove_snapshot(
@ -3157,6 +3176,7 @@ def vm_worker_send_snapshot(
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
return_status=False,
):
current_stage = 0
@ -3171,11 +3191,12 @@ def vm_worker_send_snapshot(
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
message = (f"Could not find VM '{domain}' in the cluster",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Get our side's VM configuration details
try:
@ -3184,31 +3205,34 @@ def vm_worker_send_snapshot(
vm_detail = None
if not isinstance(vm_detail, dict):
fail(
celery,
f"VM listing returned invalid data: {vm_detail}",
)
return False
message = (f"VM listing returned invalid data: {vm_detail}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if the snapshot exists
if not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if the incremental parent exists
if incremental_parent is not None and not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",)
fail(celery, message)
if return_status:
return False, message
else:
return False
vm_name = vm_detail["name"]
@ -3234,23 +3258,26 @@ def vm_worker_send_snapshot(
if "PVC API" not in response.json().get("message"):
raise ValueError("Remote API is not a PVC API or incorrect URI given")
except requests.exceptions.ConnectionError as e:
fail(
celery,
f"Connection to remote API timed out: {e}",
)
return False
message = (f"Connection to remote API timed out: {e}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
except ValueError as e:
fail(
celery,
f"Connection to remote API is not valid: {e}",
)
return False
message = (f"Connection to remote API is not valid: {e}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
except Exception as e:
fail(
celery,
f"Connection to remote API failed: {e}",
)
return False
message = (f"Connection to remote API failed: {e}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Hit the API "/status" endpoint to validate API key and cluster status
response = session.get(
@ -3263,11 +3290,14 @@ def vm_worker_send_snapshot(
"pvc_version", None
)
if current_destination_pvc_version is None:
fail(
celery,
message = (
"Connection to remote API failed: no PVC version information returned",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
expected_destination_pvc_version = "0.9.101"
# Work around development versions
@ -3278,11 +3308,14 @@ def vm_worker_send_snapshot(
if parse_version(current_destination_pvc_version) < parse_version(
expected_destination_pvc_version
):
fail(
celery,
message = (
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if the VM already exists on the remote
response = session.get(
@ -3301,11 +3334,12 @@ def vm_worker_send_snapshot(
current_destination_vm_state is not None
and current_destination_vm_state != "mirror"
):
fail(
celery,
"Remote PVC VM exists and is not a mirror",
)
return False
message = ("Remote PVC VM exists and is not a mirror",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Get details about VM snapshot
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
@ -3351,31 +3385,38 @@ def vm_worker_send_snapshot(
# Check if this snapshot is in the remote list already
if snapshot_name in [s["name"] for s in destination_vm_snapshots]:
fail(
celery,
f"Snapshot {snapshot_name} already exists on the target",
)
return False
message = (f"Snapshot {snapshot_name} already exists on the target",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check if this snapshot is older than the latest remote VM snapshot
if (
len(destination_vm_snapshots) > 0
and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"]
):
fail(
celery,
message = (
f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Check that our incremental parent exists on the remote VM
if incremental_parent is not None:
if incremental_parent not in [s["name"] for s in destination_vm_snapshots]:
fail(
celery,
message = (
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
# Begin send, set stages
total_stages += 1 + (3 * len(snapshot_rbdsnaps))
@ -3393,6 +3434,25 @@ def vm_worker_send_snapshot(
"source_snapshot": incremental_parent,
}
# Strip out autobackup and automirror tags
# These should never be wanted on the receiving side
from daemon_lib.config import (
get_autobackup_configuration,
get_automirror_configuration,
)
autobackup_config = get_autobackup_configuration()
automirror_config = get_automirror_configuration()
new_tags = list()
for tag in vm_detail["tags"]:
tag_base = tag["name"].split(":")[0]
if tag_base in [t for t in autobackup_config["backup_tags"]] or tag_base in [
t for t in automirror_config["mirror_tags"]
]:
continue
new_tags.append(tag)
vm_detail["tags"] = new_tags
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
@ -3400,11 +3460,12 @@ def vm_worker_send_snapshot(
json=vm_detail,
)
if response.status_code != 200:
fail(
celery,
f"Failed to send config: {response.json()['message']}",
)
return False
message = (f"Failed to send config: {response.json()['message']}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Create the block devices on the remote side if this is a new VM send
block_t_start = time.time()
@ -3431,11 +3492,12 @@ def vm_worker_send_snapshot(
error_message = f"Multiple details returned for volume {rbd_name}"
else:
error_message = f"Error getting details for volume {rbd_name}"
fail(
celery,
error_message,
)
return False
message = (error_message,)
fail(celery, message)
if return_status:
return False, message
else:
return False
try:
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
@ -3460,11 +3522,12 @@ def vm_worker_send_snapshot(
data=None,
)
if response.status_code != 404 and current_destination_vm_state is None:
fail(
celery,
f"Remote storage pool {pool} already contains volume {volume}",
)
return False
message = (f"Remote storage pool {pool} already contains volume {volume}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
if current_destination_vm_state is not None:
try:
@ -3474,7 +3537,10 @@ def vm_worker_send_snapshot(
except Exception as e:
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
fail(celery, error_message)
return False
if return_status:
return False, error_message
else:
return False
if local_volume_size != remote_volume_size:
response = session.put(
@ -3482,11 +3548,12 @@ def vm_worker_send_snapshot(
params={"new_size": local_volume_size, "force": True},
)
if response.status_code != 200:
fail(
celery,
"Failed to resize remote volume to match local volume",
)
return False
message = ("Failed to resize remote volume to match local volume",)
fail(celery, message)
if return_status:
return False, message
else:
return False
# Send the volume to the remote
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
@ -3557,11 +3624,14 @@ def vm_worker_send_snapshot(
stream=True,
)
if response.status_code != 200:
fail(
celery,
message = (
f"Failed to send diff batch: {response.json()['message']}",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
current_chunk_time = time.time()
chunk_time = current_chunk_time - last_chunk_time
@ -3609,11 +3679,12 @@ def vm_worker_send_snapshot(
buffer.clear() # Clear the buffer after sending
buffer_size = 0 # Reset buffer size
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
message = (f"Failed to send snapshot: {response.json()['message']}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
finally:
image.close()
ioctx.close()
@ -3657,11 +3728,14 @@ def vm_worker_send_snapshot(
data=full_chunker(),
)
if response.status_code != 200:
fail(
celery,
message = (
f"Failed to send snapshot: {response.json()['message']}",
)
return False
fail(celery, message)
if return_status:
return False, message
else:
return False
finally:
image.close()
ioctx.close()
@ -3678,11 +3752,12 @@ def vm_worker_send_snapshot(
params=send_params,
)
if response.status_code != 200:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
message = (f"Failed to send snapshot: {response.json()['message']}",)
fail(celery, message)
if return_status:
return False, message
else:
return False
finally:
image.close()
ioctx.close()
@ -3692,12 +3767,24 @@ def vm_worker_send_snapshot(
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
current_stage += 1
return finish(
celery,
message = (
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
current=current_stage,
total=total_stages,
)
if return_status:
finish(
celery,
message,
current=current_stage,
total=total_stages,
)
return True, message
else:
return finish(
celery,
message,
current=current_stage,
total=total_stages,
)
def vm_worker_create_mirror(

View File

@ -393,6 +393,8 @@ api:
private_key: ""
# Automatic backups
# If this section is present, autobackups will be enabled; otherwise, they will be disabled.
# The pvc-ansible roles manage this including the various timer units, so avoid adjusting this manually.
autobackup:
# Backup root path on the node, used as the remote mountpoint
@ -451,5 +453,55 @@ autobackup:
# This example shows a generic umount leveraging the backup_root_path variable
- "/usr/bin/umount {backup_root_path}"
# Automatic mirroring to peer clusters
# If this section is present, automirrors will be enabled; otherwise, they will be disabled.
# The pvc-ansible roles manage this including the various timer units, so avoid adjusting this manually.
automirror:
# Destination clusters
# A list of destination cluster API endpoints to send mirrors to.
# For each entry, the "name" field will be mapped to the "{cluster}" variable in the tag(s)
# above. For more details on how exactly this works, please consult the documentation.
destinations:
# An example entry; contains the same information as a "pvc connection" entry
# The key in this dictionary is the "name" of the cluster, which is what must be suffixed
# to a tag and is displayed in the report and status output.
cluster2:
# The destination address, either an IP or an FQDN the destination API is reachable at
address: pvc.cluster2.mydomain.tld
# The destination port (usually 7370)
port: 7370
# The API prefix (usually '/api/v1') without a trailing slash
prefix: "/api/v1"
# The API key of the destination
key: 00000000-0000-0000-0000-000000000000
# Whether or not to use SSL for the connection
ssl: yes
# Whether or not to verify SSL for the connection
verify_ssl: yes
# Storage pool for VMs on the destination
pool: vms
# Default destination
# The cluster name to send mirrors to for VMs without an explicit "{cluster}" tag
# Always required, even if there is only a single destination
default_destination: cluster2
# VM tag(s) to mirror
# Only VMs with at least one of the given tag(s) will be mirrored; all others will be skipped
# All mirror tags support suffixing a ":{cluster}" argument, which will override the default
# cluster and send mirrors to the given cluster name (in the list below). Multiple suffixed
# tags are supported; if more than one is, the VM will be mirrored to all specified clusters.
mirror_tags:
- "automirror"
# The number of snapshots to keep, on both sides - mirror snapshots older than the last
# X snapshots will be automatically removed to save space
# Depending on the interval specified in the pvc-ansible variables, this may be either a
# relatively short or relatively long time.
keep_snapshots: 7
# VIM modeline, requires "set modeline" in your VIMRC
# vim: expandtab shiftwidth=2 tabstop=2 filetype=yaml

View File

@ -53,6 +53,9 @@ from daemon_lib.vmbuilder import (
from daemon_lib.autobackup import (
worker_cluster_autobackup,
)
from daemon_lib.automirror import (
worker_cluster_automirror,
)
# Daemon version
version = "0.9.103"
@ -122,6 +125,26 @@ def cluster_autobackup(self, force_full=False, email_recipients=None, run_on="pr
)
@celery.task(name="cluster.automirror", bind=True, routing_key="run_on")
def cluster_automirror(
self, email_recipients=None, email_errors_only=False, run_on="primary"
):
@ZKConnection(config)
def run_cluster_automirror(
zkhandler, self, email_recipients=None, email_errors_only=False
):
return worker_cluster_automirror(
zkhandler,
self,
email_recipients=email_recipients,
email_errors_only=email_errors_only,
)
return run_cluster_automirror(
self, email_recipients=email_recipients, email_errors_only=email_errors_only
)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)