Compare commits

...

6 Commits

Author SHA1 Message Date
Joshua Boniface fe1740ecd9 Add VM snapshot send (initial) 2024-09-13 11:52:08 -04:00
Joshua Boniface 5c3ce358df Update responses for Celery tasks 2024-09-13 11:47:04 -04:00
Joshua Boniface cf5ee23a18 Fix incorrect default value typos 2024-09-13 11:09:00 -04:00
Joshua Boniface c766648ed0 Add "mirror" VM state 2024-09-11 10:12:29 -04:00
Joshua Boniface 6960513bbd Add Ceph block receive (initial) 2024-09-11 00:51:07 -04:00
Joshua Boniface a2e5df9f6d Add support for Gunicorn execution
Modifies pvcapid to run under Gunicorn when in non-debug mode, instead
of the Flask development server. This is proper practice for one, and
also helps increase performance slightly in some workloads (file uploads
mainly).
2024-09-09 13:20:03 -04:00
11 changed files with 845 additions and 107 deletions

View File

@ -19,6 +19,13 @@
# #
############################################################################### ###############################################################################
import sys
from os import path
# Ensure current directory (/usr/share/pvc) is in the system path for Gunicorn
current_dir = path.dirname(path.abspath(__file__))
sys.path.append(current_dir)
import pvcapid.Daemon # noqa: F401 import pvcapid.Daemon # noqa: F401
pvcapid.Daemon.entrypoint() pvcapid.Daemon.entrypoint()

View File

@ -19,15 +19,15 @@
# #
############################################################################### ###############################################################################
import sys
import os
import subprocess
from ssl import SSLContext, TLSVersion from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg import daemon_lib.config as cfg
# Daemon version # Daemon version
version = "0.9.100" version = "0.9.100~git-73c0834f"
# API version # API version
API_VERSION = 1.0 API_VERSION = 1.0
@ -37,7 +37,6 @@ API_VERSION = 1.0
# Helper Functions # Helper Functions
########################################################## ##########################################################
def strtobool(stringv): def strtobool(stringv):
if stringv is None: if stringv is None:
return False return False
@ -53,7 +52,6 @@ def strtobool(stringv):
# Configuration Parsing # Configuration Parsing
########################################################## ##########################################################
# Get our configuration # Get our configuration
config = cfg.get_configuration() config = cfg.get_configuration()
config["daemon_name"] = "pvcapid" config["daemon_name"] = "pvcapid"
@ -61,22 +59,15 @@ config["daemon_version"] = version
########################################################## ##########################################################
# Entrypoint # Flask App Creation for Gunicorn
########################################################## ##########################################################
def create_app():
def entrypoint(): """
import pvcapid.flaskapi as pvc_api # noqa: E402 Create and return the Flask app and SSL context if necessary.
"""
if config["api_ssl_enabled"]: # Import the Flask app from pvcapid.flaskapi after adjusting the path
context = SSLContext() import pvcapid.flaskapi as pvc_api
context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs()
context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
context = None
# Print our startup messages # Print our startup messages
print("") print("")
@ -102,9 +93,59 @@ def entrypoint():
print("") print("")
pvc_api.celery_startup() pvc_api.celery_startup()
pvc_api.app.run(
config["api_listen_address"], return pvc_api.app
config["api_listen_port"],
threaded=True,
ssl_context=context, ##########################################################
) # Entrypoint
##########################################################
def entrypoint():
if config['debug']:
app = create_app()
if config["api_ssl_enabled"]:
ssl_context = SSLContext()
ssl_context.minimum_version = TLSVersion.TLSv1
ssl_context.get_ca_certs()
ssl_context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
ssl_context = None
app.run(
config["api_listen_address"],
config["api_listen_port"],
threaded=True,
ssl_context=ssl_context,
)
else:
# Build the command to run Gunicorn
gunicorn_cmd = [
'gunicorn',
'--workers', '1',
'--threads', '8',
'--timeout', '86400',
'--bind', '{}:{}'.format(config["api_listen_address"], config["api_listen_port"]),
'pvcapid.Daemon:create_app()',
'--log-level', 'info',
'--access-logfile', '-',
'--error-logfile', '-',
]
if config["api_ssl_enabled"]:
gunicorn_cmd += [
'--certfile', config["api_ssl_cert_file"],
'--keyfile', config["api_ssl_key_file"]
]
# Run Gunicorn
try:
subprocess.run(gunicorn_cmd)
except KeyboardInterrupt:
exit(0)
except Exception as e:
print(e)
exit(1)

View File

@ -2801,7 +2801,7 @@ class API_VM_Locks(Resource):
- vm - vm
responses: responses:
202: 202:
description: OK description: Accepted
schema: schema:
type: string type: string
description: The Celery job ID of the task description: The Celery job ID of the task
@ -2924,11 +2924,11 @@ class API_VM_Device(Resource):
required: true required: true
description: The raw Libvirt XML definition of the device to attach description: The raw Libvirt XML definition of the device to attach
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -2978,11 +2978,11 @@ class API_VM_Device(Resource):
required: true required: true
description: The raw Libvirt XML definition of the device to detach description: The raw Libvirt XML definition of the device to detach
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -3234,11 +3234,11 @@ class API_VM_Snapshot(Resource):
required: false required: false
description: A custom name for the snapshot instead of autogeneration by date description: A custom name for the snapshot instead of autogeneration by date
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3292,11 +3292,11 @@ class API_VM_Snapshot(Resource):
required: true required: true
description: The name of the snapshot to remove description: The name of the snapshot to remove
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3356,11 +3356,11 @@ class API_VM_Snapshot_Rollback(Resource):
required: true required: true
description: The name of the snapshot to roll back to description: The name of the snapshot to roll back to
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3436,15 +3436,15 @@ class API_VM_Snapshot_Export(Resource):
description: The absolute file path to export the snapshot to on the active primary coordinator description: The absolute file path to export the snapshot to on the active primary coordinator
- in: query - in: query
name: incremental_parent name: incremental_parent
type: boolean type: string
required: false required: false
description: A snapshot name to generate an incremental diff from description: A snapshot name to generate an incremental diff from
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3529,11 +3529,11 @@ class API_VM_Snapshot_Import(Resource):
default: true default: true
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Execution error description: Execution error
schema: schema:
@ -3572,6 +3572,220 @@ class API_VM_Snapshot_Import(Resource):
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import") api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
# /vm/<vm>/snapshot/send
class API_VM_Snapshot_Send(Resource):
@RequestParser(
[
{
"name": "snapshot_name",
"required": True,
"helptext": "A snapshot name must be specified",
},
{
"name": "destination_api_uri",
"required": True,
"helptext": "A destination API URI must be specified",
},
{
"name": "destination_api_key",
"required": True,
"helptext": "A destination API key must be specified",
},
{
"name": "destination_api_verify_ssl",
"required": False,
},
{
"name": "incremental_parent",
"required": False,
},
{
"name": "destination_storage_pool",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Send a snapshot of a VM's disks and configuration to another PVC cluster
---
tags:
- vm
parameters:
- in: query
name: snapshot_name
type: string
required: true
description: The name of the snapshot to export (must exist)
- in: query
name: destination_api_uri
type: string
required: true
description: The base API URI of the destination PVC cluster (with prefix if applicable)
- in: query
name: destination_api_key
type: string
required: true
description: The API authentication key of the destination PVC cluster
- in: query
name: destination_api_verify_ssl
type: boolean
required: false
default: true
description: Whether or not to validate SSL certificates for an SSL-enabled destination API
- in: query
name: incremental_parent
type: string
required: false
description: A snapshot name to generate an incremental diff from; incremental send only if unset
- in: query
name: destination_storage_pool
type: string
required: false
default: source storage pool name
description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool
responses:
202:
description: Accepted
schema:
type: string
description: The Celery job ID of the task
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
snapshot_name = reqargs.get("snapshot_name", None)
destination_api_uri = reqargs.get("destination_api_uri", None)
destination_api_key = reqargs.get("destination_api_key", None)
destination_api_verify_ssl = vool(strtobool(reqargs.get("destination_api_verify_ssl", "true")))
incremental_parent = reqargs.get("incremental_parent", None)
destination_storage_pool = reqargs.get("destination_storage_pool", None)
task = run_celery_task(
"vm.send_snapshot",
domain=vm,
snapshot_name=snapshot_name,
destination_api_uri=destination_api_uri,
destination_api_key=destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
run_on="primary",
)
return (
{
"task_id": task.id,
"task_name": "vm.send_snapshot",
"run_on": f"{get_primary_node()} (primary)",
},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_VM_Snapshot_Send, "/vm/<vm>/snapshot/send")
# /vm/<vm>/snapshot/receive/block
class API_VM_Snapshot_Receive_Block(Resource):
@RequestParser(
[
{
"name": "pool",
"required": True,
},
{
"name": "volume",
"required": True,
},
{
"name": "snapshot",
"required": True,
},
{
"name": "size",
"required": True,
},
{
"name": "source_snapshot",
"required": False,
},
]
)
@Authenticator
def post(self, vm, reqargs):
"""
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
---
tags:
- vm
parameters:
- in: query
name: pool
type: string
required: true
description: The name of the destination Ceph RBD data pool
- in: query
name: volume
type: string
required: true
description: The name of the destination Ceph RBD volume
- in: query
name: snapshot
type: string
required: true
description: The name of the destination Ceph RBD volume snapshot
- in: query
name: size
type: integer
required: true
description: The size in bytes of the Ceph RBD volume
- in: query
name: source_snapshot
type: string
required: false
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Execution error
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
"""
return api_helper.vm_snapshot_receive_block(
reqargs.get("pool"),
reqargs.get("volume") + "_recv",
reqargs.get("snapshot"),
int(reqargs.get("size")),
flask.request.stream,
source_snapshot=reqargs.get("source_snapshot"),
)
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
# /vm/autobackup # /vm/autobackup
class API_VM_Autobackup_Root(Resource): class API_VM_Autobackup_Root(Resource):
@RequestParser( @RequestParser(
@ -3601,14 +3815,11 @@ class API_VM_Autobackup_Root(Resource):
type: string type: string
example: "user@domain.tld" example: "user@domain.tld"
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
properties: description: The Celery job ID of the task
task_id:
type: string
description: Task ID for the provisioner Celery worker
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5173,11 +5384,16 @@ class API_Storage_Ceph_Benchmark(Resource):
required: true required: true
description: The PVC storage pool to benchmark description: The PVC storage pool to benchmark
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: string type: string
description: The Celery job ID of the benchmark (unused elsewhere) description: The Celery job ID of the task
400:
description: Bad request
schema:
type: object
id: Message
""" """
# Verify that the pool is valid # Verify that the pool is valid
_list, code = api_helper.ceph_pool_list( _list, code = api_helper.ceph_pool_list(
@ -5301,11 +5517,11 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
required: true required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5500,11 +5716,11 @@ class API_Storage_Ceph_OSD_Root(Resource):
required: false required: false
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5615,11 +5831,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: false required: false
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5673,11 +5889,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: true required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
400: 400:
description: Bad request description: Bad request
schema: schema:
@ -5731,7 +5947,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
- in: query - in: query
name: force name: force
type: boolean type: boolean
required: flase required: false
description: Force removal even if some step(s) fail description: Force removal even if some step(s) fail
- in: query - in: query
name: yes-i-really-mean-it name: yes-i-really-mean-it
@ -5739,11 +5955,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
required: true required: true
description: A confirmation string to ensure that the API consumer really means it description: A confirmation string to ensure that the API consumer really means it
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
id: Message description: The Celery job ID of the task
404: 404:
description: Not found description: Not found
schema: schema:
@ -6331,7 +6547,7 @@ class API_Storage_Ceph_Volume_Root(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if volume creation would violate 80% full soft cap on the pool description: Force action if volume creation would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6408,7 +6624,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if volume creation would violate 80% full soft cap on the pool description: Force action if volume creation would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6460,7 +6676,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if new volume size would violate 80% full soft cap on the pool description: Force action if new volume size would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -6585,7 +6801,7 @@ class API_Storage_Ceph_Volume_Element_Clone(Resource):
name: force name: force
type: boolean type: boolean
required: false required: false
default: flase default: false
description: Force action if clone volume size would violate 80% full soft cap on the pool description: Force action if clone volume size would violate 80% full soft cap on the pool
responses: responses:
200: 200:
@ -9440,14 +9656,11 @@ class API_Provisioner_Create_Root(Resource):
type: string type: string
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
responses: responses:
200: 202:
description: OK description: Accepted
schema: schema:
type: object type: string
properties: description: The Celery job ID of the task
task_id:
type: string
description: Task ID for the provisioner Celery worker
400: 400:
description: Bad request description: Bad request
schema: schema:

View File

@ -1280,7 +1280,7 @@ def vm_flush_locks(zkhandler, vm):
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
) )
if retdata[0].get("state") not in ["stop", "disable"]: if retdata[0].get("state") not in ["stop", "disable", "mirror"]:
return {"message": "VM must be stopped to flush locks"}, 400 return {"message": "VM must be stopped to flush locks"}, 400
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm) retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
@ -1294,6 +1294,58 @@ def vm_flush_locks(zkhandler, vm):
return output, retcode return output, retcode
def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snapshot=None):
try:
import rados
import rbd
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
ioctx = cluster.open_ioctx(pool)
if not source_snapshot:
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, volume, size)
image = rbd.Image(ioctx, volume)
last_chunk = 0
chunk_size = 1024 * 1024 * 128
if source_snapshot:
# Receiving diff data
print(f"Applying diff between {source_snapshot} and {snapshot}")
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
# Extract the offset and length (8 bytes each) and the data
offset = int.from_bytes(chunk[:8], 'big')
length = int.from_bytes(chunk[8:16], 'big')
data = chunk[16:16 + length]
image.write(data, offset)
image.create_snap(snapshot)
else:
# Receiving full image
print(f"Importing full snapshot {snapshot}")
while True:
chunk = flask.request.stream.read(chunk_size)
if not chunk:
break
image.write(chunk, last_chunk)
last_chunk += len(chunk)
image.create_snap(snapshot)
image.close()
ioctx.close()
cluster.shutdown()
except Exception as e:
return {"message": f"Failed to import block device: {e}"}, 400
# #
# Network functions # Network functions
# #

View File

@ -206,12 +206,12 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}") output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
vm_states = ["start", "disable"] vm_states = ["start", "disable", "mirror"]
vm_states.extend( vm_states.extend(
[ [
state state
for state in data.get("vms", {}).keys() for state in data.get("vms", {}).keys()
if state not in ["total", "start", "disable"] if state not in ["total", "start", "disable", "mirror"]
] ]
) )
@ -221,7 +221,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
continue continue
if state in ["start"]: if state in ["start"]:
state_colour = ansii["green"] state_colour = ansii["green"]
elif state in ["migrate", "disable", "provision"]: elif state in ["migrate", "disable", "provision", "mirror"]:
state_colour = ansii["blue"] state_colour = ansii["blue"]
elif state in ["stop", "fail"]: elif state in ["stop", "fail"]:
state_colour = ansii["red"] state_colour = ansii["red"]

View File

@ -1760,6 +1760,7 @@ def format_info(config, domain_information, long_output):
"provision": ansiprint.blue(), "provision": ansiprint.blue(),
"restore": ansiprint.blue(), "restore": ansiprint.blue(),
"import": ansiprint.blue(), "import": ansiprint.blue(),
"mirror": ansiprint.blue(),
} }
ainformation.append( ainformation.append(
"{}State:{} {}{}{}".format( "{}State:{} {}{}{}".format(

View File

@ -1160,6 +1160,7 @@ def get_resource_metrics(zkhandler):
"fail": 8, "fail": 8,
"import": 9, "import": 9,
"restore": 10, "restore": 10,
"mirror": 99,
} }
state = vm["state"] state = vm["state"]
output_lines.append( output_lines.append(

View File

@ -85,6 +85,7 @@ vm_state_combinations = [
"provision", "provision",
"import", "import",
"restore", "restore",
"mirror",
] ]
ceph_osd_state_combinations = [ ceph_osd_state_combinations = [
"up,in", "up,in",

View File

@ -24,6 +24,7 @@ import re
import os.path import os.path
import lxml.objectify import lxml.objectify
import lxml.etree import lxml.etree
import subprocess
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
@ -580,7 +581,7 @@ def rename_vm(zkhandler, domain, new_domain):
# Verify that the VM is in a stopped state; renaming is not supported otherwise # Verify that the VM is in a stopped state; renaming is not supported otherwise
state = zkhandler.read(("domain.state", dom_uuid)) state = zkhandler.read(("domain.state", dom_uuid))
if state not in ["stop", "disable"]: if state not in ["stop", "disable", "mirror"]:
return ( return (
False, False,
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format( 'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
@ -1125,6 +1126,7 @@ def get_list(
"migrate", "migrate",
"unmigrate", "unmigrate",
"provision", "provision",
"mirror",
] ]
if state not in valid_states: if state not in valid_states:
return False, 'VM state "{}" is not valid.'.format(state) return False, 'VM state "{}" is not valid.'.format(state)
@ -1903,10 +1905,10 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
# Check that the domain is stopped (unless force_unlock is set) # Check that the domain is stopped (unless force_unlock is set)
domain_state = zkhandler.read(("domain.state", dom_uuid)) domain_state = zkhandler.read(("domain.state", dom_uuid))
if not force_unlock and domain_state not in ["stop", "disable", "fail"]: if not force_unlock and domain_state not in ["stop", "disable", "fail", "mirror"]:
fail( fail(
celery, celery,
f"VM state {domain_state} not in [stop, disable, fail] and not forcing", f"VM state {domain_state} not in [stop, disable, fail, mirror] and not forcing",
) )
return False return False
@ -2329,7 +2331,7 @@ def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name):
# Verify that the VM is in a stopped state; renaming is not supported otherwise # Verify that the VM is in a stopped state; renaming is not supported otherwise
state = zkhandler.read(("domain.state", dom_uuid)) state = zkhandler.read(("domain.state", dom_uuid))
if state not in ["stop", "disable"]: if state not in ["stop", "disable", "mirror"]:
fail( fail(
celery, celery,
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running", f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
@ -3118,3 +3120,389 @@ def vm_worker_import_snapshot(
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
) )
def vm_worker_send_snapshot(
zkhandler,
celery,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
):
import requests
from packaging.version import parse as parse_version
current_stage = 0
total_stages = 1
start(
celery,
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
current=current_stage,
total=total_stages,
)
# Validate that VM exists in cluster
dom_uuid = getDomainUUID(zkhandler, domain)
if not dom_uuid:
fail(
celery,
f"Could not find VM '{domain}' in the cluster",
)
return False
# Get our side's VM configuration details
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
if not isinstance(vm_detail, dict):
fail(celery, f"VM listing returned invalid data: {vm_detail}",)
return False
# Check if the snapshot exists
if not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Check if the incremental parent exists
if incremental_parent is not None and not zkhandler.exists(
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
):
fail(
celery,
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
)
return False
# Validate that the destination cluster can be reached
destination_api_timeout = (3.05, 172800)
destination_api_headers = {
"X-Api-Key": destination_api_key,
"Content-Type": "application/octet-stream",
}
try:
# Hit the API root; this should return "PVC API version x"
response = requests.get(
f"{destination_api_uri}/",
timeout=destination_api_timeout,
headers=None,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
if "PVC API" not in response.json().get("message"):
raise ValueError("Remote API is not a PVC API or incorrect URI given")
except requests.exceptions.ConnectionError as e:
fail(
celery,
f"Connection to remote API timed out: {e}",
)
return False
except ValueError as e:
fail(
celery,
f"Connection to remote API is not valid: {e}",
)
return False
except Exception as e:
fail(
celery,
f"Connection to remote API failed: {e}",
)
return False
# Hit the API "/status" endpoint to validate API key and cluster status
response = requests.get(
f"{destination_api_uri}/status",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
destination_cluster_status = response.json()
current_destination_pvc_version = destination_cluster_status.get(
"pvc_version", None
)
if current_destination_pvc_version is None:
fail(
celery,
"Connection to remote API failed: no PVC version information returned",
)
return False
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
if parse_version(current_destination_pvc_version) < parse_version(
expected_destination_pvc_version
):
fail(
celery,
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
)
return False
# Check if the VM already exists on the remote
response = requests.get(
f"{destination_api_uri}/vm/{domain}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
destination_vm_status = response.json()
current_destination_vm_state = destination_vm_status.get(
"state", None
)
if current_destination_vm_state is not None and current_destination_vm_state != "mirror":
fail(
celery,
f"Remote PVC VM exists and is not a mirror",
)
return False
# Get details about VM snapshot
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
[
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.name",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.timestamp",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.xml",
snapshot_name,
)
),
(
(
"domain.snapshots",
dom_uuid,
"domain_snapshot.rbd_snapshots",
snapshot_name,
)
),
]
)
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
# Get details about remote VM snapshots
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
latest_destination_vm_snapshot = destination_vm_snapshots[0]
# Check if this snapshot is in the remote list already
if snapshot_name in [s['name'] for s in destination_vm_snapshots]:
fail(
celery,
f"Snapshot {snapshot_name} already exists on the target",
)
return False
# Check if this snapshot is older than the latest remote VM snapshot
if snapshot_timestamp < latest_destination_vm_snapshot["timestamp"]:
fail(
celery,
f"Target has a newer snapshot ({latest_destination_vm_snapshot['name']}); cannot send old snapshot {snapshot_name}",
)
return False
# Check that our incremental parent exists on the remote VM
if incremental_parent is not None:
if increment_parent not in [s['name'] for s in destination_vm_snapshots]:
fail(
celery,
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
)
return False
# Set send type
send_type = "incremental" if incremental_parent is not None else "full"
# Begin send, set stages
total_stages = 2 + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
# Create the block devices on the remote side if this is a new VM send
for rbd in [r for r in vm_detail["disks"] if r['type'] == 'rbd']:
pool, volume = rbd["name"].split('/')
current_stage += 1
update(
celery,
f"Preparing remote volume for {rbd}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Get the storage volume details
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
if not retcode or len(retdata) != 1:
if len(retdata) < 1:
error_message = f"No detail returned for volume {rbd}"
elif len(retdata) > 1:
error_message = f"Multiple details returned for volume {rbd}"
else:
error_message = f"Error getting details for volume {rbd}"
fail(
celery,
error_message,
)
return False
try:
size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
except Exception as e:
error_message = f"Failed to get volume size for {rbd}: {e}"
if destination_storage_pool is not None:
pool = destination_storage_pool
if current_destination_vm_state is None:
current_stage += 1
update(
celery,
f"Creating remote volume {pool}/{volume} for {rbd}@{snapshot_name}",
current=current_stage,
total=total_stages,
)
# Check if the volume exists on the target
response = requests.get(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=None,
data=None,
verify=destination_api_verify_ssl,
)
if response.status_code != 404:
fail(
celery,
f"Remote storage pool {pool} already contains volume {volume}",
)
return False
# Create the volume on the target
params = {
"size": size_bytes,
}
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=destination_api_headers,
params=params,
data=None,
verify=destination_api_verify_ssl,
)
destination_volume_create_status = response.json()
if response.status_code != 200:
fail(
celery,
f"Failed to create volume {pool}/{volume} on target: {destination_volume_create_status['message']}",
)
return False
# Send the volume to the remote
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
ioctx = cluster.open_ioctx(pool)
image = rbd.Image(ioctx, name=volume, snapshot=snapshot, read_only=True)
size = image.size()
chunk_size_mb = 128
if incremental_parent is not None:
# Diff between incremental_parent and snapshot
def diff_chunker():
def diff_cb(offset, length, exists):
""" Callback to handle diff regions """
if exists:
data = image.read(offset, length)
yield (offset.to_bytes(8, 'big') + length.to_bytes(8, 'big') + data)
image.set_snap(incremental_parent)
image.diff_iterate(0, size, source_snapshot, diff_cb)
data_stream = diff_chunker()
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd}")
else:
# Full image transfer
def chunker():
d_start = time.time()
chunk_size = 1024 * 1024 * chunk_size_mb
current_chunk = 0
while current_chunk < size:
t_end = time.time()
t_tot = t_end - t_start
chunk = image.read(current_chunk, chunk_size)
yield chunk
current_chunk += chunk_size
data_stream = chunker()
celery_message = f"Sending full image of {rbd}@{snapshot_name}"
current_stage += 1
update(
celery,
f"Sending volume {rbd}@{snapshot_name} to target ({send_type})",
current=current_stage,
total=total_stages,
)
send_params = {
'pool': pool,
'volume': volume,
'snapshot': snapshot_name,
'size': size,
'source_snapshot': incremental_parent,
}
send_headers = {
'X-Api-Key': destination_api_key,,
'Content-Type': 'application/octet-stream',
'Transfer-Encoding': None # Disable chunked transfer encoding
}
try:
response = requests.post(
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
timeout=destination_api_timeout,
headers=send_headers,
params=send_params,
data=data_stream,
verify=destination_api_verify_ssl,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send snapshot: {e}",
)
return False
finally:
image.close()
ioctx.close()
cluster.shutdown()
# Send the VM configuration
if current_destination_vm_state is None:
# This is a new VM, so define it
#response = requests.post()
else:
# This is a modification
#response = requests.post()

2
debian/control vendored
View File

@ -32,7 +32,7 @@ Description: Parallel Virtual Cluster worker daemon
Package: pvc-daemon-api Package: pvc-daemon-api
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate Depends: systemd, pvc-daemon-common, gunicorn, python3-gunicorn, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .

View File

@ -33,6 +33,7 @@ from daemon_lib.vm import (
vm_worker_rollback_snapshot, vm_worker_rollback_snapshot,
vm_worker_export_snapshot, vm_worker_export_snapshot,
vm_worker_import_snapshot, vm_worker_import_snapshot,
vm_worker_send_snapshot,
) )
from daemon_lib.ceph import ( from daemon_lib.ceph import (
osd_worker_add_osd, osd_worker_add_osd,
@ -227,6 +228,39 @@ def vm_import_snapshot(
) )
@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on")
def vm_send_snapshot(
self,
domain=None,
snapshot_name=None,
destination_api_uri="",
destination_api_key="",
destination_api_verify_ssl=True,
incremental_parent=None,
destination_storage_pool=None,
run_on="primary",
):
@ZKConnection(config)
def run_vm_send_snapshot(
zkhandler, self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=True, incremental_parent=None, destination_storage_pool=None,
):
return vm_worker_send_snapshot(
zkhandler,
self,
domain,
snapshot_name,
destination_api_uri,
destination_api_key,
destination_api_verify_ssl=destination_api_verify_ssl,
incremental_parent=incremental_parent,
destination_storage_pool=destination_storage_pool,
)
return run_vm_send_snapshot(
self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, incremental_parent=incremental_parent, destination_storage_pool=destination_storage_pool
)
@celery.task(name="osd.add", bind=True, routing_key="run_on") @celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add( def osd_add(
self, self,