Compare commits
No commits in common. "b525bbe81d96f050677893c91042f9f6ebc0f0ae" and "cebc660fb041a36bdc26923bc16ba411a469b022" have entirely different histories.
b525bbe81d
...
cebc660fb0
@ -4533,76 +4533,6 @@ class API_VM_Autobackup_Root(Resource):
|
|||||||
api.add_resource(API_VM_Autobackup_Root, "/vm/autobackup")
|
api.add_resource(API_VM_Autobackup_Root, "/vm/autobackup")
|
||||||
|
|
||||||
|
|
||||||
# /vm/automirror
|
|
||||||
class API_VM_Automirror_Root(Resource):
|
|
||||||
@RequestParser(
|
|
||||||
[
|
|
||||||
{"name": "email_recipients"},
|
|
||||||
{
|
|
||||||
"name": "email_errors_only",
|
|
||||||
"required": False,
|
|
||||||
},
|
|
||||||
]
|
|
||||||
)
|
|
||||||
@Authenticator
|
|
||||||
def post(self, reqargs):
|
|
||||||
"""
|
|
||||||
Trigger a cluster automirror job
|
|
||||||
---
|
|
||||||
tags:
|
|
||||||
- provisioner
|
|
||||||
parameters:
|
|
||||||
- in: query
|
|
||||||
name: email_recipients
|
|
||||||
type: string
|
|
||||||
required: false
|
|
||||||
description: A list of email addresses to send failure and report emails to, comma-separated
|
|
||||||
- in: query
|
|
||||||
name: email_errors_only
|
|
||||||
type: boolean
|
|
||||||
required: false
|
|
||||||
default: false
|
|
||||||
description: If set and true, only sends a report email to email_recipients when there is an error with at least one mirror
|
|
||||||
responses:
|
|
||||||
202:
|
|
||||||
description: Accepted
|
|
||||||
schema:
|
|
||||||
type: object
|
|
||||||
description: The Celery job information of the task
|
|
||||||
id: CeleryTask
|
|
||||||
400:
|
|
||||||
description: Bad request
|
|
||||||
schema:
|
|
||||||
type: object
|
|
||||||
id: Message
|
|
||||||
"""
|
|
||||||
|
|
||||||
email_recipients = reqargs.get("email_recipients", None)
|
|
||||||
if email_recipients is not None and not isinstance(email_recipients, list):
|
|
||||||
email_recipients = [email_recipients]
|
|
||||||
|
|
||||||
email_errors_only = bool(strtobool(reqargs.get("email_errors_only", "False")))
|
|
||||||
|
|
||||||
task = run_celery_task(
|
|
||||||
"cluster.automirror",
|
|
||||||
email_recipients=email_recipients,
|
|
||||||
email_errors_only=email_errors_only,
|
|
||||||
run_on="primary",
|
|
||||||
)
|
|
||||||
return (
|
|
||||||
{
|
|
||||||
"task_id": task.id,
|
|
||||||
"task_name": "cluster.automirror",
|
|
||||||
"run_on": f"{get_primary_node()} (primary)",
|
|
||||||
},
|
|
||||||
202,
|
|
||||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
api.add_resource(API_VM_Automirror_Root, "/vm/automirror")
|
|
||||||
|
|
||||||
|
|
||||||
##########################################################
|
##########################################################
|
||||||
# Client API - Network
|
# Client API - Network
|
||||||
##########################################################
|
##########################################################
|
||||||
|
@ -2577,89 +2577,6 @@ def cli_vm_autobackup(email_report, force_full_flag, wait_flag, cron_flag):
|
|||||||
finish(retcode, retmsg)
|
finish(retcode, retmsg)
|
||||||
|
|
||||||
|
|
||||||
###############################################################################
|
|
||||||
# > pvc vm automirror
|
|
||||||
###############################################################################
|
|
||||||
@click.command(
|
|
||||||
name="automirror", short_help="Perform automatic virtual machine mirrors."
|
|
||||||
)
|
|
||||||
@connection_req
|
|
||||||
@click.option(
|
|
||||||
"--email-report",
|
|
||||||
"email_report",
|
|
||||||
default=None,
|
|
||||||
help="Email a mirror summary report to the specified address(es), comma-separated.",
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--email-errors-only",
|
|
||||||
"email_errors_only_flag",
|
|
||||||
is_flag=True,
|
|
||||||
default=False,
|
|
||||||
show_default=True,
|
|
||||||
help="Only send a mirror summary report when at least one error occurrs.",
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--wait/--no-wait",
|
|
||||||
"wait_flag",
|
|
||||||
is_flag=True,
|
|
||||||
default=True,
|
|
||||||
show_default=True,
|
|
||||||
help="Wait or don't wait for task to complete, showing progress if waiting.",
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--cron",
|
|
||||||
"cron_flag",
|
|
||||||
is_flag=True,
|
|
||||||
default=False,
|
|
||||||
show_default=True,
|
|
||||||
help="Run in cron mode (returns immediately with no output once job is submitted).",
|
|
||||||
)
|
|
||||||
def cli_vm_automirror(email_report, email_errors_only_flag, wait_flag, cron_flag):
|
|
||||||
"""
|
|
||||||
Perform automated mirrors of VMs, with integrated cleanup and full/incremental scheduling.
|
|
||||||
|
|
||||||
This command enables automatic mirrors of PVC VMs at the block level, leveraging the various "pvc vm snapshot"
|
|
||||||
functions with an internal rentention and cleanup system. VMs and the destination cluster(s) are selected based
|
|
||||||
on configured VM tags and a set of static configs in the cluster's `pvc.conf` configuration.
|
|
||||||
|
|
||||||
This command should be run from cron or a timer at a regular interval (e.g. daily, hourly, etc.) which defines
|
|
||||||
how often mirrors are taken. Mirror retention is based only on the number of recorded mirrors on the remote side,
|
|
||||||
not on the time interval between them. Mirrors taken manually outside of the "automirror" command are not counted
|
|
||||||
towards the format or retention of automirrors.
|
|
||||||
|
|
||||||
WARNING: Running this command manually will interfere with the schedule! Do not run manually except for testing.
|
|
||||||
|
|
||||||
The actual details of the automirror, including retention policies, are defined in the main PVC configuration file
|
|
||||||
`/etc/pvc/pvc.conf`. See the sample configuration for more details.
|
|
||||||
|
|
||||||
An optional report on all current mirrors can be emailed to one or more email addresses using the
|
|
||||||
"--email-report" flag. This report will include information on all current known mirrors.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if cron_flag:
|
|
||||||
wait_flag = False
|
|
||||||
|
|
||||||
if email_report is not None:
|
|
||||||
email_recipients = email_report.split(",")
|
|
||||||
else:
|
|
||||||
email_recipients = None
|
|
||||||
|
|
||||||
retcode, retmsg = pvc.lib.vm.vm_automirror(
|
|
||||||
CLI_CONFIG,
|
|
||||||
email_recipients=email_recipients,
|
|
||||||
email_errors_only_flag=email_errors_only_flag,
|
|
||||||
wait_flag=wait_flag,
|
|
||||||
)
|
|
||||||
|
|
||||||
if retcode and wait_flag:
|
|
||||||
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
|
|
||||||
|
|
||||||
if cron_flag:
|
|
||||||
finish(retcode, None)
|
|
||||||
else:
|
|
||||||
finish(retcode, retmsg)
|
|
||||||
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# > pvc vm tag
|
# > pvc vm tag
|
||||||
###############################################################################
|
###############################################################################
|
||||||
@ -7001,7 +6918,6 @@ cli_vm_backup.add_command(cli_vm_backup_restore)
|
|||||||
cli_vm_backup.add_command(cli_vm_backup_remove)
|
cli_vm_backup.add_command(cli_vm_backup_remove)
|
||||||
cli_vm.add_command(cli_vm_backup)
|
cli_vm.add_command(cli_vm_backup)
|
||||||
cli_vm.add_command(cli_vm_autobackup)
|
cli_vm.add_command(cli_vm_autobackup)
|
||||||
cli_vm.add_command(cli_vm_automirror)
|
|
||||||
cli_vm_tag.add_command(cli_vm_tag_get)
|
cli_vm_tag.add_command(cli_vm_tag_get)
|
||||||
cli_vm_tag.add_command(cli_vm_tag_add)
|
cli_vm_tag.add_command(cli_vm_tag_add)
|
||||||
cli_vm_tag.add_command(cli_vm_tag_remove)
|
cli_vm_tag.add_command(cli_vm_tag_remove)
|
||||||
|
@ -714,26 +714,6 @@ def vm_autobackup(config, email_recipients=None, force_full_flag=False, wait_fla
|
|||||||
return get_wait_retdata(response, wait_flag)
|
return get_wait_retdata(response, wait_flag)
|
||||||
|
|
||||||
|
|
||||||
def vm_automirror(
|
|
||||||
config, email_recipients=None, email_errors_only_flag=False, wait_flag=True
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Perform a cluster VM automirror
|
|
||||||
|
|
||||||
API endpoint: POST /vm//automirror
|
|
||||||
API arguments: email_recipients=email_recipients, email_errors_only=email_errors_only_flag
|
|
||||||
API schema: {"message":"{data}"}
|
|
||||||
"""
|
|
||||||
params = {
|
|
||||||
"email_recipients": email_recipients,
|
|
||||||
"email_errors_only": email_errors_only_flag,
|
|
||||||
}
|
|
||||||
|
|
||||||
response = call_api(config, "post", "/vm/automirror", params=params)
|
|
||||||
|
|
||||||
return get_wait_retdata(response, wait_flag)
|
|
||||||
|
|
||||||
|
|
||||||
def vm_vcpus_set(config, vm, vcpus, topology, restart):
|
def vm_vcpus_set(config, vm, vcpus, topology, restart):
|
||||||
"""
|
"""
|
||||||
Set the vCPU count of the VM with topology
|
Set the vCPU count of the VM with topology
|
||||||
|
@ -1,496 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
# automirror.py - PVC API Automirror functions
|
|
||||||
# Part of the Parallel Virtual Cluster (PVC) system
|
|
||||||
#
|
|
||||||
# Copyright (C) 2018-2024 Joshua M. Boniface <joshua@boniface.me>
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, version 3.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
#
|
|
||||||
###############################################################################
|
|
||||||
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
from os import popen
|
|
||||||
|
|
||||||
from daemon_lib.config import get_automirror_configuration
|
|
||||||
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
|
|
||||||
|
|
||||||
import daemon_lib.vm as vm
|
|
||||||
|
|
||||||
|
|
||||||
def send_execution_failure_report(
|
|
||||||
celery, config, recipients=None, total_time=0, error=None
|
|
||||||
):
|
|
||||||
if recipients is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
from email.utils import formatdate
|
|
||||||
from socket import gethostname
|
|
||||||
|
|
||||||
log_message = f"Sending email failure report to {', '.join(recipients)}"
|
|
||||||
log_info(celery, log_message)
|
|
||||||
|
|
||||||
current_datetime = datetime.now()
|
|
||||||
email_datetime = formatdate(float(current_datetime.strftime("%s")))
|
|
||||||
|
|
||||||
email = list()
|
|
||||||
email.append(f"Date: {email_datetime}")
|
|
||||||
email.append(
|
|
||||||
f"Subject: PVC Automirror execution failure for cluster '{config['cluster']}'"
|
|
||||||
)
|
|
||||||
|
|
||||||
email_to = list()
|
|
||||||
for recipient in recipients:
|
|
||||||
email_to.append(f"<{recipient}>")
|
|
||||||
|
|
||||||
email.append(f"To: {', '.join(email_to)}")
|
|
||||||
email.append(f"From: PVC Automirror System <pvc@{gethostname()}>")
|
|
||||||
email.append("")
|
|
||||||
|
|
||||||
email.append(
|
|
||||||
f"A PVC automirror has FAILED at {current_datetime} in {total_time}s due to an execution error."
|
|
||||||
)
|
|
||||||
email.append("")
|
|
||||||
email.append("The reported error message is:")
|
|
||||||
email.append(f" {error}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
with popen("/usr/sbin/sendmail -t", "w") as p:
|
|
||||||
p.write("\n".join(email))
|
|
||||||
except Exception as e:
|
|
||||||
log_err(f"Failed to send report email: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def send_execution_summary_report(
|
|
||||||
celery,
|
|
||||||
config,
|
|
||||||
recipients=None,
|
|
||||||
total_time=0,
|
|
||||||
summary=dict(),
|
|
||||||
local_deleted_snapshots=dict(),
|
|
||||||
):
|
|
||||||
if recipients is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
from email.utils import formatdate
|
|
||||||
from socket import gethostname
|
|
||||||
|
|
||||||
log_message = f"Sending email summary report to {', '.join(recipients)}"
|
|
||||||
log_info(celery, log_message)
|
|
||||||
|
|
||||||
current_datetime = datetime.now()
|
|
||||||
email_datetime = formatdate(float(current_datetime.strftime("%s")))
|
|
||||||
|
|
||||||
email = list()
|
|
||||||
email.append(f"Date: {email_datetime}")
|
|
||||||
email.append(f"Subject: PVC Automirror report for cluster '{config['cluster']}'")
|
|
||||||
|
|
||||||
email_to = list()
|
|
||||||
for recipient in recipients:
|
|
||||||
email_to.append(f"<{recipient}>")
|
|
||||||
|
|
||||||
email.append(f"To: {', '.join(email_to)}")
|
|
||||||
email.append(f"From: PVC Automirror System <pvc@{gethostname()}>")
|
|
||||||
email.append("")
|
|
||||||
|
|
||||||
email.append(
|
|
||||||
f"A PVC automirror has been completed at {current_datetime} in {total_time}."
|
|
||||||
)
|
|
||||||
email.append("")
|
|
||||||
email.append(
|
|
||||||
"The following is a summary of all VM mirror jobs executed during this run:"
|
|
||||||
)
|
|
||||||
email.append("")
|
|
||||||
|
|
||||||
vm_names = {k.split(":")[0] for k in summary.keys()}
|
|
||||||
for vm_name in vm_names:
|
|
||||||
email.append(f"VM {vm_name}:")
|
|
||||||
email.append(" Mirror jobs:")
|
|
||||||
for destination_name in {
|
|
||||||
k.split(":")[1] for k in summary.keys() if k.split(":")[0] == vm_name
|
|
||||||
}:
|
|
||||||
mirror = summary[f"{vm_name}:{destination_name}"]
|
|
||||||
datestring = mirror.get("snapshot_name").replace("am", "")
|
|
||||||
mirror_date = datetime.strptime(datestring, "%Y%m%d%H%M%S")
|
|
||||||
if mirror.get("result", False):
|
|
||||||
email.append(
|
|
||||||
f" * {mirror_date}: Success to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
email.append(
|
|
||||||
f" * {mirror_date}: Failure to cluster {destination_name} in {mirror.get('runtime_secs', 0)} seconds, ID {mirror.get('snapshot_name')}"
|
|
||||||
)
|
|
||||||
email.append(
|
|
||||||
f" {mirror.get('result_message')}"
|
|
||||||
)
|
|
||||||
|
|
||||||
email.append(
|
|
||||||
" The following aged-out local snapshots were removed during cleanup:"
|
|
||||||
)
|
|
||||||
for snapshot in local_deleted_snapshots[vm_name]:
|
|
||||||
email.append(f" * {snapshot}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
with popen("/usr/sbin/sendmail -t", "w") as p:
|
|
||||||
p.write("\n".join(email))
|
|
||||||
except Exception as e:
|
|
||||||
log_err(f"Failed to send report email: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def run_vm_mirror(
|
|
||||||
zkhandler, celery, config, vm_detail, snapshot_name, destination_name
|
|
||||||
):
|
|
||||||
vm_name = vm_detail["name"]
|
|
||||||
keep_count = config["mirror_keep_snapshots"]
|
|
||||||
|
|
||||||
try:
|
|
||||||
destination = config["mirror_destinations"][destination_name]
|
|
||||||
except Exception:
|
|
||||||
error_message = f"Failed to find valid destination cluster '{destination_name}' for VM '{vm_name}'"
|
|
||||||
log_err(celery, error_message)
|
|
||||||
return error_message
|
|
||||||
|
|
||||||
destination_api_uri = f"{'https' if destination['ssl'] else 'http'}://{destination['address']}:{destination['port']}{destination['prefix']}"
|
|
||||||
destination_api_timeout = (3.05, 172800)
|
|
||||||
destination_api_headers = {
|
|
||||||
"X-Api-Key": destination["key"],
|
|
||||||
}
|
|
||||||
|
|
||||||
session = requests.Session()
|
|
||||||
session.headers.update(destination_api_headers)
|
|
||||||
session.verify = destination["verify_ssl"]
|
|
||||||
session.timeout = destination_api_timeout
|
|
||||||
|
|
||||||
# Get the last snapshot that is on the remote side for incrementals
|
|
||||||
response = session.get(
|
|
||||||
f"{destination_api_uri}/vm/{vm_name}",
|
|
||||||
params=None,
|
|
||||||
data=None,
|
|
||||||
)
|
|
||||||
destination_vm_detail = response.json()
|
|
||||||
if type(destination_vm_detail) is list and len(destination_vm_detail) > 0:
|
|
||||||
destination_vm_detail = destination_vm_detail[0]
|
|
||||||
try:
|
|
||||||
last_snapshot_name = [
|
|
||||||
s
|
|
||||||
for s in destination_vm_detail["snapshots"]
|
|
||||||
if s["name"].startswith("am")
|
|
||||||
][0]["name"]
|
|
||||||
except Exception:
|
|
||||||
last_snapshot_name = None
|
|
||||||
else:
|
|
||||||
last_snapshot_name = None
|
|
||||||
|
|
||||||
# Send the current snapshot
|
|
||||||
result, message = vm.vm_worker_send_snapshot(
|
|
||||||
zkhandler,
|
|
||||||
None,
|
|
||||||
vm_name,
|
|
||||||
snapshot_name,
|
|
||||||
destination_api_uri,
|
|
||||||
destination["key"],
|
|
||||||
destination_api_verify_ssl=destination["verify_ssl"],
|
|
||||||
incremental_parent=last_snapshot_name,
|
|
||||||
destination_storage_pool=destination["pool"],
|
|
||||||
return_status=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not result:
|
|
||||||
return False, message
|
|
||||||
|
|
||||||
response = session.get(
|
|
||||||
f"{destination_api_uri}/vm/{vm_name}",
|
|
||||||
params=None,
|
|
||||||
data=None,
|
|
||||||
)
|
|
||||||
destination_vm_detail = response.json()
|
|
||||||
if type(destination_vm_detail) is list and len(destination_vm_detail) > 0:
|
|
||||||
destination_vm_detail = destination_vm_detail[0]
|
|
||||||
else:
|
|
||||||
message = "Remote VM somehow does not exist after successful mirror; skipping snapshot cleanup"
|
|
||||||
return False, message
|
|
||||||
|
|
||||||
# Find any mirror snapshots that are expired
|
|
||||||
remote_snapshots = [
|
|
||||||
s for s in destination_vm_detail["snapshots"] if s["name"].startswith("am")
|
|
||||||
]
|
|
||||||
|
|
||||||
# Snapshots are in dated descending order due to the names
|
|
||||||
if len(remote_snapshots) > keep_count:
|
|
||||||
remote_marked_for_deletion = [s["name"] for s in remote_snapshots[keep_count:]]
|
|
||||||
else:
|
|
||||||
remote_marked_for_deletion = list()
|
|
||||||
|
|
||||||
for snapshot in remote_marked_for_deletion:
|
|
||||||
log_info(
|
|
||||||
celery,
|
|
||||||
f"VM {vm_detail['name']} removing stale remote automirror snapshot {snapshot}",
|
|
||||||
)
|
|
||||||
session.delete(
|
|
||||||
f"{destination_api_uri}/vm/{vm_name}/snapshot",
|
|
||||||
params={
|
|
||||||
"snapshot_name": snapshot,
|
|
||||||
},
|
|
||||||
data=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
return True, remote_marked_for_deletion
|
|
||||||
|
|
||||||
|
|
||||||
def worker_cluster_automirror(
|
|
||||||
zkhandler,
|
|
||||||
celery,
|
|
||||||
force_full=False,
|
|
||||||
email_recipients=None,
|
|
||||||
email_errors_only=False,
|
|
||||||
):
|
|
||||||
config = get_automirror_configuration()
|
|
||||||
|
|
||||||
mirror_summary = dict()
|
|
||||||
local_deleted_snapshots = dict()
|
|
||||||
|
|
||||||
current_stage = 0
|
|
||||||
total_stages = 1
|
|
||||||
|
|
||||||
start(
|
|
||||||
celery,
|
|
||||||
f"Starting cluster '{config['cluster']}' VM automirror",
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not config["automirror_enabled"]:
|
|
||||||
message = "Automirrors are not configured on this cluster."
|
|
||||||
log_info(celery, message)
|
|
||||||
return finish(
|
|
||||||
celery,
|
|
||||||
message,
|
|
||||||
current=total_stages,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
if email_recipients is not None:
|
|
||||||
total_stages += 1
|
|
||||||
|
|
||||||
automirror_start_time = datetime.now()
|
|
||||||
|
|
||||||
retcode, vm_list = vm.get_list(zkhandler)
|
|
||||||
if not retcode:
|
|
||||||
error_message = f"Failed to fetch VM list: {vm_list}"
|
|
||||||
log_err(celery, error_message)
|
|
||||||
current_stage += 1
|
|
||||||
send_execution_failure_report(
|
|
||||||
celery,
|
|
||||||
config,
|
|
||||||
recipients=email_recipients,
|
|
||||||
error=error_message,
|
|
||||||
)
|
|
||||||
fail(celery, error_message)
|
|
||||||
return False
|
|
||||||
|
|
||||||
mirror_vms = list()
|
|
||||||
for vm_detail in vm_list:
|
|
||||||
mirror_vm = {
|
|
||||||
"detail": vm_detail,
|
|
||||||
"destinations": list(),
|
|
||||||
}
|
|
||||||
vm_tag_names = [t["name"] for t in vm_detail["tags"]]
|
|
||||||
# Check if any of the mirror tags are present; if they are, then we should mirror
|
|
||||||
vm_mirror_tags = list()
|
|
||||||
for tag in vm_tag_names:
|
|
||||||
if tag.split(":")[0] in config["mirror_tags"]:
|
|
||||||
vm_mirror_tags.append(tag)
|
|
||||||
|
|
||||||
# There are no mirror tags, so skip this VM
|
|
||||||
if len(vm_mirror_tags) < 1:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Go through each tag to extract the cluster
|
|
||||||
target_clusters = set()
|
|
||||||
for tag in vm_mirror_tags:
|
|
||||||
if len(tag.split(":")) == 1:
|
|
||||||
# This is a direct match without any cluster suffix, so use the default
|
|
||||||
target_clusters.add(config["mirror_default_destination"])
|
|
||||||
if len(tag.split(":")) > 1:
|
|
||||||
# This has a cluster suffix, so use that
|
|
||||||
target_clusters.add(tag.split(":")[1])
|
|
||||||
|
|
||||||
for cluster in target_clusters:
|
|
||||||
mirror_vm["destinations"].append(cluster)
|
|
||||||
|
|
||||||
mirror_vms.append(mirror_vm)
|
|
||||||
|
|
||||||
if len(mirror_vms) < 1:
|
|
||||||
message = "Found no VMs tagged for automirror."
|
|
||||||
log_info(celery, message)
|
|
||||||
return finish(
|
|
||||||
celery,
|
|
||||||
message,
|
|
||||||
current=total_stages,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
total_stages += len(mirror_vms)
|
|
||||||
|
|
||||||
mirror_vm_names = set([b["detail"]["name"] for b in mirror_vms])
|
|
||||||
|
|
||||||
log_info(
|
|
||||||
celery,
|
|
||||||
f"Found {len(mirror_vm_names)} suitable VM(s) for automirror: {', '.join(mirror_vm_names)}",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Execute the backup: take a snapshot, then export the snapshot
|
|
||||||
for mirror_vm in mirror_vms:
|
|
||||||
vm_detail = mirror_vm["detail"]
|
|
||||||
vm_destinations = mirror_vm["destinations"]
|
|
||||||
|
|
||||||
current_stage += 1
|
|
||||||
update(
|
|
||||||
celery,
|
|
||||||
f"Performing automirror of VM {vm_detail['name']}",
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Automirrors use a custom name to allow them to be properly cleaned up later
|
|
||||||
now = datetime.now()
|
|
||||||
datestring = now.strftime("%Y%m%d%H%M%S")
|
|
||||||
snapshot_name = f"am{datestring}"
|
|
||||||
|
|
||||||
result, message = vm.vm_worker_create_snapshot(
|
|
||||||
zkhandler,
|
|
||||||
None,
|
|
||||||
vm_detail["name"],
|
|
||||||
snapshot_name=snapshot_name,
|
|
||||||
return_status=True,
|
|
||||||
)
|
|
||||||
if not result:
|
|
||||||
for destination in vm_destinations:
|
|
||||||
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
|
|
||||||
"result": result,
|
|
||||||
"snapshot_name": snapshot_name,
|
|
||||||
"runtime_secs": 0,
|
|
||||||
"result_message": message,
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
|
|
||||||
remote_marked_for_deletion = dict()
|
|
||||||
for destination in vm_destinations:
|
|
||||||
mirror_start = datetime.now()
|
|
||||||
result, ret = run_vm_mirror(
|
|
||||||
zkhandler,
|
|
||||||
celery,
|
|
||||||
config,
|
|
||||||
vm_detail,
|
|
||||||
snapshot_name,
|
|
||||||
destination,
|
|
||||||
)
|
|
||||||
mirror_end = datetime.now()
|
|
||||||
runtime_secs = (mirror_end - mirror_start).seconds
|
|
||||||
if result:
|
|
||||||
remote_marked_for_deletion[destination] = ret
|
|
||||||
|
|
||||||
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
|
|
||||||
"result": result,
|
|
||||||
"snapshot_name": snapshot_name,
|
|
||||||
"runtime_secs": runtime_secs,
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
log_warn(
|
|
||||||
celery,
|
|
||||||
f"Error in mirror send: {ret}",
|
|
||||||
)
|
|
||||||
mirror_summary[f"{vm_detail['name']}:{destination}"] = {
|
|
||||||
"result": result,
|
|
||||||
"snapshot_name": snapshot_name,
|
|
||||||
"runtime_secs": runtime_secs,
|
|
||||||
"result_message": ret,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Find all local snapshots that were present in all remote snapshot deletions,
|
|
||||||
# then remove them
|
|
||||||
# If one of the sends fails, this should result in nothing being removed
|
|
||||||
if remote_marked_for_deletion:
|
|
||||||
all_lists = [set(lst) for lst in remote_marked_for_deletion.values() if lst]
|
|
||||||
if all_lists:
|
|
||||||
local_marked_for_deletion = set.intersection(*all_lists)
|
|
||||||
else:
|
|
||||||
local_marked_for_deletion = set()
|
|
||||||
else:
|
|
||||||
local_marked_for_deletion = set()
|
|
||||||
|
|
||||||
for snapshot in local_marked_for_deletion:
|
|
||||||
log_info(
|
|
||||||
celery,
|
|
||||||
f"VM {vm_detail['name']} removing stale local automirror snapshot {snapshot}",
|
|
||||||
)
|
|
||||||
vm.vm_worker_remove_snapshot(
|
|
||||||
zkhandler,
|
|
||||||
None,
|
|
||||||
vm_detail["name"],
|
|
||||||
snapshot,
|
|
||||||
)
|
|
||||||
|
|
||||||
local_deleted_snapshots[vm_detail["name"]] = local_marked_for_deletion
|
|
||||||
|
|
||||||
automirror_end_time = datetime.now()
|
|
||||||
automirror_total_time = automirror_end_time - automirror_start_time
|
|
||||||
|
|
||||||
if email_recipients is not None:
|
|
||||||
current_stage += 1
|
|
||||||
if email_errors_only and not all(
|
|
||||||
[s["result"] for _, s in mirror_summary.items()]
|
|
||||||
):
|
|
||||||
# Send report if we're in errors only and at least one send failed
|
|
||||||
send_report = True
|
|
||||||
elif not email_errors_only:
|
|
||||||
# Send report if we're not in errors only
|
|
||||||
send_report = True
|
|
||||||
else:
|
|
||||||
# Otherwise (errors only and all successful) don't send
|
|
||||||
send_report = False
|
|
||||||
|
|
||||||
if send_report:
|
|
||||||
update(
|
|
||||||
celery,
|
|
||||||
"Sending automirror results summary email",
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
send_execution_summary_report(
|
|
||||||
celery,
|
|
||||||
config,
|
|
||||||
recipients=email_recipients,
|
|
||||||
total_time=automirror_total_time,
|
|
||||||
summary=mirror_summary,
|
|
||||||
local_deleted_snapshots=local_deleted_snapshots,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
update(
|
|
||||||
celery,
|
|
||||||
"Skipping automirror results summary email (no failures)",
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
current_stage += 1
|
|
||||||
return finish(
|
|
||||||
celery,
|
|
||||||
f"Successfully completed cluster '{config['cluster']}' VM automirror",
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
@ -22,7 +22,6 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from inspect import stack
|
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
@ -33,8 +32,7 @@ class TaskFailure(Exception):
|
|||||||
|
|
||||||
def start(celery, msg, current=0, total=1):
|
def start(celery, msg, current=0, total=1):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
caller_name = stack()[1].function
|
logger.info(f"Starting {current}/{total}: {msg}")
|
||||||
logger.info(f"Start {caller_name} {current}/{total}: {msg}")
|
|
||||||
if celery is None:
|
if celery is None:
|
||||||
return
|
return
|
||||||
celery.update_state(
|
celery.update_state(
|
||||||
@ -44,14 +42,13 @@ def start(celery, msg, current=0, total=1):
|
|||||||
|
|
||||||
|
|
||||||
def fail(celery, msg, exception=None, current=1, total=1):
|
def fail(celery, msg, exception=None, current=1, total=1):
|
||||||
caller_name = stack()[1].function
|
|
||||||
if exception is None:
|
if exception is None:
|
||||||
exception = TaskFailure
|
exception = TaskFailure
|
||||||
|
|
||||||
msg = f"{type(exception()).__name__}: {msg}"
|
msg = f"{type(exception()).__name__}: {msg}"
|
||||||
|
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
logger.error(f"Fail {caller_name} {current}/{total}: {msg}")
|
logger.error(msg)
|
||||||
|
|
||||||
sys.tracebacklimit = 0
|
sys.tracebacklimit = 0
|
||||||
raise exception(msg)
|
raise exception(msg)
|
||||||
@ -59,26 +56,22 @@ def fail(celery, msg, exception=None, current=1, total=1):
|
|||||||
|
|
||||||
def log_info(celery, msg):
|
def log_info(celery, msg):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
caller_name = stack()[1].function
|
logger.info(f"Task log: {msg}")
|
||||||
logger.info(f"Log {caller_name}: {msg}")
|
|
||||||
|
|
||||||
|
|
||||||
def log_warn(celery, msg):
|
def log_warn(celery, msg):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
caller_name = stack()[1].function
|
logger.warning(f"Task log: {msg}")
|
||||||
logger.warning(f"Log {caller_name}: {msg}")
|
|
||||||
|
|
||||||
|
|
||||||
def log_err(celery, msg):
|
def log_err(celery, msg):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
caller_name = stack()[1].function
|
logger.error(f"Task log: {msg}")
|
||||||
logger.error(f"Log {caller_name}: {msg}")
|
|
||||||
|
|
||||||
|
|
||||||
def update(celery, msg, current=1, total=2):
|
def update(celery, msg, current=1, total=2):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
caller_name = stack()[1].function
|
logger.info(f"Task update {current}/{total}: {msg}")
|
||||||
logger.info(f"Update {caller_name} {current}/{total}: {msg}")
|
|
||||||
if celery is None:
|
if celery is None:
|
||||||
return
|
return
|
||||||
celery.update_state(
|
celery.update_state(
|
||||||
@ -89,8 +82,7 @@ def update(celery, msg, current=1, total=2):
|
|||||||
|
|
||||||
def finish(celery, msg, current=2, total=2):
|
def finish(celery, msg, current=2, total=2):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
caller_name = stack()[1].function
|
logger.info(f"Task update {current}/{total}: Finishing up")
|
||||||
logger.info(f"Update {caller_name} {current}/{total}: Finishing up")
|
|
||||||
if celery is None:
|
if celery is None:
|
||||||
return
|
return
|
||||||
celery.update_state(
|
celery.update_state(
|
||||||
@ -98,5 +90,5 @@ def finish(celery, msg, current=2, total=2):
|
|||||||
meta={"current": current, "total": total, "status": "Finishing up"},
|
meta={"current": current, "total": total, "status": "Finishing up"},
|
||||||
)
|
)
|
||||||
sleep(1)
|
sleep(1)
|
||||||
logger.info(f"Success {caller_name} {current}/{total}: {msg}")
|
logger.info(f"Success {current}/{total}: {msg}")
|
||||||
return {"status": msg, "current": current, "total": total}
|
return {"status": msg, "current": current, "total": total}
|
||||||
|
@ -481,64 +481,6 @@ def get_autobackup_configuration():
|
|||||||
return config
|
return config
|
||||||
|
|
||||||
|
|
||||||
def get_parsed_automirror_configuration(config_file):
|
|
||||||
"""
|
|
||||||
Load the configuration; this is the same main pvc.conf that the daemons read
|
|
||||||
"""
|
|
||||||
print('Loading configuration from file "{}"'.format(config_file))
|
|
||||||
|
|
||||||
with open(config_file, "r") as cfgfh:
|
|
||||||
try:
|
|
||||||
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Failed to parse configuration file: {e}")
|
|
||||||
os._exit(1)
|
|
||||||
|
|
||||||
config = dict()
|
|
||||||
|
|
||||||
try:
|
|
||||||
o_cluster = o_config["cluster"]
|
|
||||||
config_cluster = {
|
|
||||||
"cluster": o_cluster["name"],
|
|
||||||
"automirror_enabled": True,
|
|
||||||
}
|
|
||||||
config = {**config, **config_cluster}
|
|
||||||
|
|
||||||
o_automirror = o_config["automirror"]
|
|
||||||
if o_automirror is None:
|
|
||||||
config["automirror_enabled"] = False
|
|
||||||
return config
|
|
||||||
|
|
||||||
config_automirror = {
|
|
||||||
"mirror_tags": o_automirror["mirror_tags"],
|
|
||||||
"mirror_destinations": o_automirror["destinations"],
|
|
||||||
"mirror_default_destination": o_automirror["default_destination"],
|
|
||||||
"mirror_keep_snapshots": o_automirror["keep_snapshots"],
|
|
||||||
}
|
|
||||||
config = {**config, **config_automirror}
|
|
||||||
|
|
||||||
if config["mirror_default_destination"] not in [
|
|
||||||
d for d in config["mirror_destinations"].keys()
|
|
||||||
]:
|
|
||||||
raise Exception(
|
|
||||||
"Specified default mirror destination is not in the list of destinations"
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
raise MalformedConfigurationError(e)
|
|
||||||
|
|
||||||
return config
|
|
||||||
|
|
||||||
|
|
||||||
def get_automirror_configuration():
|
|
||||||
"""
|
|
||||||
Get the configuration.
|
|
||||||
"""
|
|
||||||
pvc_config_file = get_configuration_path()
|
|
||||||
config = get_parsed_automirror_configuration(pvc_config_file)
|
|
||||||
return config
|
|
||||||
|
|
||||||
|
|
||||||
def validate_directories(config):
|
def validate_directories(config):
|
||||||
if not os.path.exists(config["dynamic_directory"]):
|
if not os.path.exists(config["dynamic_directory"]):
|
||||||
os.makedirs(config["dynamic_directory"])
|
os.makedirs(config["dynamic_directory"])
|
||||||
|
@ -2107,7 +2107,6 @@ def vm_worker_create_snapshot(
|
|||||||
domain,
|
domain,
|
||||||
snapshot_name=None,
|
snapshot_name=None,
|
||||||
zk_only=False,
|
zk_only=False,
|
||||||
return_status=False,
|
|
||||||
):
|
):
|
||||||
if snapshot_name is None:
|
if snapshot_name is None:
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
@ -2125,34 +2124,27 @@ def vm_worker_create_snapshot(
|
|||||||
# Validate that VM exists in cluster
|
# Validate that VM exists in cluster
|
||||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||||
if not dom_uuid:
|
if not dom_uuid:
|
||||||
message = (f"Could not find VM '{domain}' in the cluster",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Could not find VM '{domain}' in the cluster",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
reg = re.compile("^[a-z0-9.-_]+$")
|
reg = re.compile("^[a-z0-9.-_]+$")
|
||||||
if not reg.match(snapshot_name):
|
if not reg.match(snapshot_name):
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
"Snapshot name '{snapshot_name}' contains invalid characters; only alphanumeric, '.', '-', and '_' characters are allowed",
|
"Snapshot name '{snapshot_name}' contains invalid characters; only alphanumeric, '.', '-', and '_' characters are allowed",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid))
|
current_snapshots = zkhandler.children(("domain.snapshots", dom_uuid))
|
||||||
if current_snapshots and snapshot_name in current_snapshots:
|
if current_snapshots and snapshot_name in current_snapshots:
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!",
|
f"Snapshot name '{snapshot_name}' already exists for VM '{domain}'!",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Get the list of all RBD volumes
|
# Get the list of all RBD volumes
|
||||||
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
|
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
|
||||||
@ -2186,12 +2178,11 @@ def vm_worker_create_snapshot(
|
|||||||
)
|
)
|
||||||
if not ret:
|
if not ret:
|
||||||
cleanup_failure()
|
cleanup_failure()
|
||||||
message = (msg.replace("ERROR: ", ""),)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
msg.replace("ERROR: ", ""),
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
else:
|
else:
|
||||||
snap_list.append(f"{pool}/{volume}@{snapshot_name}")
|
snap_list.append(f"{pool}/{volume}@{snapshot_name}")
|
||||||
|
|
||||||
@ -2251,22 +2242,12 @@ def vm_worker_create_snapshot(
|
|||||||
)
|
)
|
||||||
|
|
||||||
current_stage += 1
|
current_stage += 1
|
||||||
message = (f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",)
|
return finish(
|
||||||
if return_status:
|
celery,
|
||||||
finish(
|
f"Successfully created snapshot '{snapshot_name}' of VM '{domain}'",
|
||||||
celery,
|
current=current_stage,
|
||||||
message,
|
total=total_stages,
|
||||||
current=current_stage,
|
)
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
return True, message
|
|
||||||
else:
|
|
||||||
return finish(
|
|
||||||
celery,
|
|
||||||
message,
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def vm_worker_remove_snapshot(
|
def vm_worker_remove_snapshot(
|
||||||
@ -3176,7 +3157,6 @@ def vm_worker_send_snapshot(
|
|||||||
destination_api_verify_ssl=True,
|
destination_api_verify_ssl=True,
|
||||||
incremental_parent=None,
|
incremental_parent=None,
|
||||||
destination_storage_pool=None,
|
destination_storage_pool=None,
|
||||||
return_status=False,
|
|
||||||
):
|
):
|
||||||
|
|
||||||
current_stage = 0
|
current_stage = 0
|
||||||
@ -3191,12 +3171,11 @@ def vm_worker_send_snapshot(
|
|||||||
# Validate that VM exists in cluster
|
# Validate that VM exists in cluster
|
||||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||||
if not dom_uuid:
|
if not dom_uuid:
|
||||||
message = (f"Could not find VM '{domain}' in the cluster",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Could not find VM '{domain}' in the cluster",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Get our side's VM configuration details
|
# Get our side's VM configuration details
|
||||||
try:
|
try:
|
||||||
@ -3205,34 +3184,31 @@ def vm_worker_send_snapshot(
|
|||||||
vm_detail = None
|
vm_detail = None
|
||||||
|
|
||||||
if not isinstance(vm_detail, dict):
|
if not isinstance(vm_detail, dict):
|
||||||
message = (f"VM listing returned invalid data: {vm_detail}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"VM listing returned invalid data: {vm_detail}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Check if the snapshot exists
|
# Check if the snapshot exists
|
||||||
if not zkhandler.exists(
|
if not zkhandler.exists(
|
||||||
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
|
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
|
||||||
):
|
):
|
||||||
message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Check if the incremental parent exists
|
# Check if the incremental parent exists
|
||||||
if incremental_parent is not None and not zkhandler.exists(
|
if incremental_parent is not None and not zkhandler.exists(
|
||||||
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
|
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
|
||||||
):
|
):
|
||||||
message = (f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
vm_name = vm_detail["name"]
|
vm_name = vm_detail["name"]
|
||||||
|
|
||||||
@ -3258,26 +3234,23 @@ def vm_worker_send_snapshot(
|
|||||||
if "PVC API" not in response.json().get("message"):
|
if "PVC API" not in response.json().get("message"):
|
||||||
raise ValueError("Remote API is not a PVC API or incorrect URI given")
|
raise ValueError("Remote API is not a PVC API or incorrect URI given")
|
||||||
except requests.exceptions.ConnectionError as e:
|
except requests.exceptions.ConnectionError as e:
|
||||||
message = (f"Connection to remote API timed out: {e}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Connection to remote API timed out: {e}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
message = (f"Connection to remote API is not valid: {e}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Connection to remote API is not valid: {e}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
message = (f"Connection to remote API failed: {e}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Connection to remote API failed: {e}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Hit the API "/status" endpoint to validate API key and cluster status
|
# Hit the API "/status" endpoint to validate API key and cluster status
|
||||||
response = session.get(
|
response = session.get(
|
||||||
@ -3290,14 +3263,11 @@ def vm_worker_send_snapshot(
|
|||||||
"pvc_version", None
|
"pvc_version", None
|
||||||
)
|
)
|
||||||
if current_destination_pvc_version is None:
|
if current_destination_pvc_version is None:
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
"Connection to remote API failed: no PVC version information returned",
|
"Connection to remote API failed: no PVC version information returned",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
expected_destination_pvc_version = "0.9.101"
|
expected_destination_pvc_version = "0.9.101"
|
||||||
# Work around development versions
|
# Work around development versions
|
||||||
@ -3308,14 +3278,11 @@ def vm_worker_send_snapshot(
|
|||||||
if parse_version(current_destination_pvc_version) < parse_version(
|
if parse_version(current_destination_pvc_version) < parse_version(
|
||||||
expected_destination_pvc_version
|
expected_destination_pvc_version
|
||||||
):
|
):
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
|
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check if the VM already exists on the remote
|
# Check if the VM already exists on the remote
|
||||||
response = session.get(
|
response = session.get(
|
||||||
@ -3334,12 +3301,11 @@ def vm_worker_send_snapshot(
|
|||||||
current_destination_vm_state is not None
|
current_destination_vm_state is not None
|
||||||
and current_destination_vm_state != "mirror"
|
and current_destination_vm_state != "mirror"
|
||||||
):
|
):
|
||||||
message = ("Remote PVC VM exists and is not a mirror",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
"Remote PVC VM exists and is not a mirror",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Get details about VM snapshot
|
# Get details about VM snapshot
|
||||||
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
|
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
|
||||||
@ -3385,38 +3351,31 @@ def vm_worker_send_snapshot(
|
|||||||
|
|
||||||
# Check if this snapshot is in the remote list already
|
# Check if this snapshot is in the remote list already
|
||||||
if snapshot_name in [s["name"] for s in destination_vm_snapshots]:
|
if snapshot_name in [s["name"] for s in destination_vm_snapshots]:
|
||||||
message = (f"Snapshot {snapshot_name} already exists on the target",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Snapshot {snapshot_name} already exists on the target",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Check if this snapshot is older than the latest remote VM snapshot
|
# Check if this snapshot is older than the latest remote VM snapshot
|
||||||
if (
|
if (
|
||||||
len(destination_vm_snapshots) > 0
|
len(destination_vm_snapshots) > 0
|
||||||
and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"]
|
and snapshot_timestamp < destination_vm_snapshots[0]["timestamp"]
|
||||||
):
|
):
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}",
|
f"Target has a newer snapshot ({destination_vm_snapshots[0]['name']}); cannot send old snapshot {snapshot_name}",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check that our incremental parent exists on the remote VM
|
# Check that our incremental parent exists on the remote VM
|
||||||
if incremental_parent is not None:
|
if incremental_parent is not None:
|
||||||
if incremental_parent not in [s["name"] for s in destination_vm_snapshots]:
|
if incremental_parent not in [s["name"] for s in destination_vm_snapshots]:
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
|
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Begin send, set stages
|
# Begin send, set stages
|
||||||
total_stages += 1 + (3 * len(snapshot_rbdsnaps))
|
total_stages += 1 + (3 * len(snapshot_rbdsnaps))
|
||||||
@ -3434,25 +3393,6 @@ def vm_worker_send_snapshot(
|
|||||||
"source_snapshot": incremental_parent,
|
"source_snapshot": incremental_parent,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Strip out autobackup and automirror tags
|
|
||||||
# These should never be wanted on the receiving side
|
|
||||||
from daemon_lib.config import (
|
|
||||||
get_autobackup_configuration,
|
|
||||||
get_automirror_configuration,
|
|
||||||
)
|
|
||||||
|
|
||||||
autobackup_config = get_autobackup_configuration()
|
|
||||||
automirror_config = get_automirror_configuration()
|
|
||||||
new_tags = list()
|
|
||||||
for tag in vm_detail["tags"]:
|
|
||||||
tag_base = tag["name"].split(":")[0]
|
|
||||||
if tag_base in [t for t in autobackup_config["backup_tags"]] or tag_base in [
|
|
||||||
t for t in automirror_config["mirror_tags"]
|
|
||||||
]:
|
|
||||||
continue
|
|
||||||
new_tags.append(tag)
|
|
||||||
vm_detail["tags"] = new_tags
|
|
||||||
|
|
||||||
response = session.post(
|
response = session.post(
|
||||||
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
|
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
|
||||||
headers={"Content-Type": "application/json"},
|
headers={"Content-Type": "application/json"},
|
||||||
@ -3460,12 +3400,11 @@ def vm_worker_send_snapshot(
|
|||||||
json=vm_detail,
|
json=vm_detail,
|
||||||
)
|
)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
message = (f"Failed to send config: {response.json()['message']}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Failed to send config: {response.json()['message']}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Create the block devices on the remote side if this is a new VM send
|
# Create the block devices on the remote side if this is a new VM send
|
||||||
block_t_start = time.time()
|
block_t_start = time.time()
|
||||||
@ -3492,12 +3431,11 @@ def vm_worker_send_snapshot(
|
|||||||
error_message = f"Multiple details returned for volume {rbd_name}"
|
error_message = f"Multiple details returned for volume {rbd_name}"
|
||||||
else:
|
else:
|
||||||
error_message = f"Error getting details for volume {rbd_name}"
|
error_message = f"Error getting details for volume {rbd_name}"
|
||||||
message = (error_message,)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
error_message,
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
local_volume_size = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||||
@ -3522,12 +3460,11 @@ def vm_worker_send_snapshot(
|
|||||||
data=None,
|
data=None,
|
||||||
)
|
)
|
||||||
if response.status_code != 404 and current_destination_vm_state is None:
|
if response.status_code != 404 and current_destination_vm_state is None:
|
||||||
message = (f"Remote storage pool {pool} already contains volume {volume}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Remote storage pool {pool} already contains volume {volume}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
if current_destination_vm_state is not None:
|
if current_destination_vm_state is not None:
|
||||||
try:
|
try:
|
||||||
@ -3537,10 +3474,7 @@ def vm_worker_send_snapshot(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
|
error_message = f"Failed to get volume size for remote {rbd_name}: {e}"
|
||||||
fail(celery, error_message)
|
fail(celery, error_message)
|
||||||
if return_status:
|
return False
|
||||||
return False, error_message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if local_volume_size != remote_volume_size:
|
if local_volume_size != remote_volume_size:
|
||||||
response = session.put(
|
response = session.put(
|
||||||
@ -3548,12 +3482,11 @@ def vm_worker_send_snapshot(
|
|||||||
params={"new_size": local_volume_size, "force": True},
|
params={"new_size": local_volume_size, "force": True},
|
||||||
)
|
)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
message = ("Failed to resize remote volume to match local volume",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
"Failed to resize remote volume to match local volume",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
# Send the volume to the remote
|
# Send the volume to the remote
|
||||||
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
cluster = rados.Rados(conffile="/etc/ceph/ceph.conf")
|
||||||
@ -3624,14 +3557,11 @@ def vm_worker_send_snapshot(
|
|||||||
stream=True,
|
stream=True,
|
||||||
)
|
)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
f"Failed to send diff batch: {response.json()['message']}",
|
f"Failed to send diff batch: {response.json()['message']}",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
current_chunk_time = time.time()
|
current_chunk_time = time.time()
|
||||||
chunk_time = current_chunk_time - last_chunk_time
|
chunk_time = current_chunk_time - last_chunk_time
|
||||||
@ -3679,12 +3609,11 @@ def vm_worker_send_snapshot(
|
|||||||
buffer.clear() # Clear the buffer after sending
|
buffer.clear() # Clear the buffer after sending
|
||||||
buffer_size = 0 # Reset buffer size
|
buffer_size = 0 # Reset buffer size
|
||||||
except Exception:
|
except Exception:
|
||||||
message = (f"Failed to send snapshot: {response.json()['message']}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Failed to send snapshot: {response.json()['message']}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
finally:
|
finally:
|
||||||
image.close()
|
image.close()
|
||||||
ioctx.close()
|
ioctx.close()
|
||||||
@ -3728,14 +3657,11 @@ def vm_worker_send_snapshot(
|
|||||||
data=full_chunker(),
|
data=full_chunker(),
|
||||||
)
|
)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
message = (
|
fail(
|
||||||
|
celery,
|
||||||
f"Failed to send snapshot: {response.json()['message']}",
|
f"Failed to send snapshot: {response.json()['message']}",
|
||||||
)
|
)
|
||||||
fail(celery, message)
|
return False
|
||||||
if return_status:
|
|
||||||
return False, message
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
finally:
|
finally:
|
||||||
image.close()
|
image.close()
|
||||||
ioctx.close()
|
ioctx.close()
|
||||||
@ -3752,12 +3678,11 @@ def vm_worker_send_snapshot(
|
|||||||
params=send_params,
|
params=send_params,
|
||||||
)
|
)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
message = (f"Failed to send snapshot: {response.json()['message']}",)
|
fail(
|
||||||
fail(celery, message)
|
celery,
|
||||||
if return_status:
|
f"Failed to send snapshot: {response.json()['message']}",
|
||||||
return False, message
|
)
|
||||||
else:
|
return False
|
||||||
return False
|
|
||||||
finally:
|
finally:
|
||||||
image.close()
|
image.close()
|
||||||
ioctx.close()
|
ioctx.close()
|
||||||
@ -3767,24 +3692,12 @@ def vm_worker_send_snapshot(
|
|||||||
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
|
block_mbps = round(block_total_mb / (block_t_end - block_t_start), 1)
|
||||||
|
|
||||||
current_stage += 1
|
current_stage += 1
|
||||||
message = (
|
return finish(
|
||||||
|
celery,
|
||||||
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
|
f"Successfully sent snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}' (average {block_mbps} MB/s)",
|
||||||
|
current=current_stage,
|
||||||
|
total=total_stages,
|
||||||
)
|
)
|
||||||
if return_status:
|
|
||||||
finish(
|
|
||||||
celery,
|
|
||||||
message,
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
return True, message
|
|
||||||
else:
|
|
||||||
return finish(
|
|
||||||
celery,
|
|
||||||
message,
|
|
||||||
current=current_stage,
|
|
||||||
total=total_stages,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def vm_worker_create_mirror(
|
def vm_worker_create_mirror(
|
||||||
|
@ -393,8 +393,6 @@ api:
|
|||||||
private_key: ""
|
private_key: ""
|
||||||
|
|
||||||
# Automatic backups
|
# Automatic backups
|
||||||
# If this section is present, autobackups will be enabled; otherwise, they will be disabled.
|
|
||||||
# The pvc-ansible roles manage this including the various timer units, so avoid adjusting this manually.
|
|
||||||
autobackup:
|
autobackup:
|
||||||
|
|
||||||
# Backup root path on the node, used as the remote mountpoint
|
# Backup root path on the node, used as the remote mountpoint
|
||||||
@ -453,55 +451,5 @@ autobackup:
|
|||||||
# This example shows a generic umount leveraging the backup_root_path variable
|
# This example shows a generic umount leveraging the backup_root_path variable
|
||||||
- "/usr/bin/umount {backup_root_path}"
|
- "/usr/bin/umount {backup_root_path}"
|
||||||
|
|
||||||
# Automatic mirroring to peer clusters
|
|
||||||
# If this section is present, automirrors will be enabled; otherwise, they will be disabled.
|
|
||||||
# The pvc-ansible roles manage this including the various timer units, so avoid adjusting this manually.
|
|
||||||
automirror:
|
|
||||||
|
|
||||||
# Destination clusters
|
|
||||||
# A list of destination cluster API endpoints to send mirrors to.
|
|
||||||
# For each entry, the "name" field will be mapped to the "{cluster}" variable in the tag(s)
|
|
||||||
# above. For more details on how exactly this works, please consult the documentation.
|
|
||||||
destinations:
|
|
||||||
|
|
||||||
# An example entry; contains the same information as a "pvc connection" entry
|
|
||||||
# The key in this dictionary is the "name" of the cluster, which is what must be suffixed
|
|
||||||
# to a tag and is displayed in the report and status output.
|
|
||||||
cluster2:
|
|
||||||
# The destination address, either an IP or an FQDN the destination API is reachable at
|
|
||||||
address: pvc.cluster2.mydomain.tld
|
|
||||||
# The destination port (usually 7370)
|
|
||||||
port: 7370
|
|
||||||
# The API prefix (usually '/api/v1') without a trailing slash
|
|
||||||
prefix: "/api/v1"
|
|
||||||
# The API key of the destination
|
|
||||||
key: 00000000-0000-0000-0000-000000000000
|
|
||||||
# Whether or not to use SSL for the connection
|
|
||||||
ssl: yes
|
|
||||||
# Whether or not to verify SSL for the connection
|
|
||||||
verify_ssl: yes
|
|
||||||
# Storage pool for VMs on the destination
|
|
||||||
pool: vms
|
|
||||||
|
|
||||||
# Default destination
|
|
||||||
# The cluster name to send mirrors to for VMs without an explicit "{cluster}" tag
|
|
||||||
# Always required, even if there is only a single destination
|
|
||||||
default_destination: cluster2
|
|
||||||
|
|
||||||
# VM tag(s) to mirror
|
|
||||||
# Only VMs with at least one of the given tag(s) will be mirrored; all others will be skipped
|
|
||||||
# All mirror tags support suffixing a ":{cluster}" argument, which will override the default
|
|
||||||
# cluster and send mirrors to the given cluster name (in the list below). Multiple suffixed
|
|
||||||
# tags are supported; if more than one is, the VM will be mirrored to all specified clusters.
|
|
||||||
mirror_tags:
|
|
||||||
- "automirror"
|
|
||||||
|
|
||||||
# The number of snapshots to keep, on both sides - mirror snapshots older than the last
|
|
||||||
# X snapshots will be automatically removed to save space
|
|
||||||
# Depending on the interval specified in the pvc-ansible variables, this may be either a
|
|
||||||
# relatively short or relatively long time.
|
|
||||||
keep_snapshots: 7
|
|
||||||
|
|
||||||
|
|
||||||
# VIM modeline, requires "set modeline" in your VIMRC
|
# VIM modeline, requires "set modeline" in your VIMRC
|
||||||
# vim: expandtab shiftwidth=2 tabstop=2 filetype=yaml
|
# vim: expandtab shiftwidth=2 tabstop=2 filetype=yaml
|
||||||
|
@ -53,9 +53,6 @@ from daemon_lib.vmbuilder import (
|
|||||||
from daemon_lib.autobackup import (
|
from daemon_lib.autobackup import (
|
||||||
worker_cluster_autobackup,
|
worker_cluster_autobackup,
|
||||||
)
|
)
|
||||||
from daemon_lib.automirror import (
|
|
||||||
worker_cluster_automirror,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Daemon version
|
# Daemon version
|
||||||
version = "0.9.103"
|
version = "0.9.103"
|
||||||
@ -125,26 +122,6 @@ def cluster_autobackup(self, force_full=False, email_recipients=None, run_on="pr
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cluster.automirror", bind=True, routing_key="run_on")
|
|
||||||
def cluster_automirror(
|
|
||||||
self, email_recipients=None, email_errors_only=False, run_on="primary"
|
|
||||||
):
|
|
||||||
@ZKConnection(config)
|
|
||||||
def run_cluster_automirror(
|
|
||||||
zkhandler, self, email_recipients=None, email_errors_only=False
|
|
||||||
):
|
|
||||||
return worker_cluster_automirror(
|
|
||||||
zkhandler,
|
|
||||||
self,
|
|
||||||
email_recipients=email_recipients,
|
|
||||||
email_errors_only=email_errors_only,
|
|
||||||
)
|
|
||||||
|
|
||||||
return run_cluster_automirror(
|
|
||||||
self, email_recipients=email_recipients, email_errors_only=email_errors_only
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
|
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
|
||||||
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
|
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
|
||||||
@ZKConnection(config)
|
@ZKConnection(config)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user