Compare commits
6 Commits
73c0834f85
...
fe1740ecd9
Author | SHA1 | Date |
---|---|---|
Joshua Boniface | fe1740ecd9 | |
Joshua Boniface | 5c3ce358df | |
Joshua Boniface | cf5ee23a18 | |
Joshua Boniface | c766648ed0 | |
Joshua Boniface | 6960513bbd | |
Joshua Boniface | a2e5df9f6d |
|
@ -19,6 +19,13 @@
|
||||||
#
|
#
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from os import path
|
||||||
|
|
||||||
|
# Ensure current directory (/usr/share/pvc) is in the system path for Gunicorn
|
||||||
|
current_dir = path.dirname(path.abspath(__file__))
|
||||||
|
sys.path.append(current_dir)
|
||||||
|
|
||||||
import pvcapid.Daemon # noqa: F401
|
import pvcapid.Daemon # noqa: F401
|
||||||
|
|
||||||
pvcapid.Daemon.entrypoint()
|
pvcapid.Daemon.entrypoint()
|
||||||
|
|
|
@ -19,15 +19,15 @@
|
||||||
#
|
#
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
from ssl import SSLContext, TLSVersion
|
from ssl import SSLContext, TLSVersion
|
||||||
|
|
||||||
from distutils.util import strtobool as dustrtobool
|
from distutils.util import strtobool as dustrtobool
|
||||||
|
|
||||||
import daemon_lib.config as cfg
|
import daemon_lib.config as cfg
|
||||||
|
|
||||||
# Daemon version
|
# Daemon version
|
||||||
version = "0.9.100"
|
version = "0.9.100~git-73c0834f"
|
||||||
|
|
||||||
# API version
|
# API version
|
||||||
API_VERSION = 1.0
|
API_VERSION = 1.0
|
||||||
|
@ -37,7 +37,6 @@ API_VERSION = 1.0
|
||||||
# Helper Functions
|
# Helper Functions
|
||||||
##########################################################
|
##########################################################
|
||||||
|
|
||||||
|
|
||||||
def strtobool(stringv):
|
def strtobool(stringv):
|
||||||
if stringv is None:
|
if stringv is None:
|
||||||
return False
|
return False
|
||||||
|
@ -53,7 +52,6 @@ def strtobool(stringv):
|
||||||
# Configuration Parsing
|
# Configuration Parsing
|
||||||
##########################################################
|
##########################################################
|
||||||
|
|
||||||
|
|
||||||
# Get our configuration
|
# Get our configuration
|
||||||
config = cfg.get_configuration()
|
config = cfg.get_configuration()
|
||||||
config["daemon_name"] = "pvcapid"
|
config["daemon_name"] = "pvcapid"
|
||||||
|
@ -61,22 +59,15 @@ config["daemon_version"] = version
|
||||||
|
|
||||||
|
|
||||||
##########################################################
|
##########################################################
|
||||||
# Entrypoint
|
# Flask App Creation for Gunicorn
|
||||||
##########################################################
|
##########################################################
|
||||||
|
|
||||||
|
def create_app():
|
||||||
def entrypoint():
|
"""
|
||||||
import pvcapid.flaskapi as pvc_api # noqa: E402
|
Create and return the Flask app and SSL context if necessary.
|
||||||
|
"""
|
||||||
if config["api_ssl_enabled"]:
|
# Import the Flask app from pvcapid.flaskapi after adjusting the path
|
||||||
context = SSLContext()
|
import pvcapid.flaskapi as pvc_api
|
||||||
context.minimum_version = TLSVersion.TLSv1
|
|
||||||
context.get_ca_certs()
|
|
||||||
context.load_cert_chain(
|
|
||||||
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
context = None
|
|
||||||
|
|
||||||
# Print our startup messages
|
# Print our startup messages
|
||||||
print("")
|
print("")
|
||||||
|
@ -102,9 +93,59 @@ def entrypoint():
|
||||||
print("")
|
print("")
|
||||||
|
|
||||||
pvc_api.celery_startup()
|
pvc_api.celery_startup()
|
||||||
pvc_api.app.run(
|
|
||||||
config["api_listen_address"],
|
return pvc_api.app
|
||||||
config["api_listen_port"],
|
|
||||||
threaded=True,
|
|
||||||
ssl_context=context,
|
##########################################################
|
||||||
)
|
# Entrypoint
|
||||||
|
##########################################################
|
||||||
|
|
||||||
|
def entrypoint():
|
||||||
|
if config['debug']:
|
||||||
|
app = create_app()
|
||||||
|
|
||||||
|
if config["api_ssl_enabled"]:
|
||||||
|
ssl_context = SSLContext()
|
||||||
|
ssl_context.minimum_version = TLSVersion.TLSv1
|
||||||
|
ssl_context.get_ca_certs()
|
||||||
|
ssl_context.load_cert_chain(
|
||||||
|
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
ssl_context = None
|
||||||
|
|
||||||
|
app.run(
|
||||||
|
config["api_listen_address"],
|
||||||
|
config["api_listen_port"],
|
||||||
|
threaded=True,
|
||||||
|
ssl_context=ssl_context,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Build the command to run Gunicorn
|
||||||
|
gunicorn_cmd = [
|
||||||
|
'gunicorn',
|
||||||
|
'--workers', '1',
|
||||||
|
'--threads', '8',
|
||||||
|
'--timeout', '86400',
|
||||||
|
'--bind', '{}:{}'.format(config["api_listen_address"], config["api_listen_port"]),
|
||||||
|
'pvcapid.Daemon:create_app()',
|
||||||
|
'--log-level', 'info',
|
||||||
|
'--access-logfile', '-',
|
||||||
|
'--error-logfile', '-',
|
||||||
|
]
|
||||||
|
|
||||||
|
if config["api_ssl_enabled"]:
|
||||||
|
gunicorn_cmd += [
|
||||||
|
'--certfile', config["api_ssl_cert_file"],
|
||||||
|
'--keyfile', config["api_ssl_key_file"]
|
||||||
|
]
|
||||||
|
|
||||||
|
# Run Gunicorn
|
||||||
|
try:
|
||||||
|
subprocess.run(gunicorn_cmd)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
exit(0)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
exit(1)
|
||||||
|
|
|
@ -2801,7 +2801,7 @@ class API_VM_Locks(Resource):
|
||||||
- vm
|
- vm
|
||||||
responses:
|
responses:
|
||||||
202:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
description: The Celery job ID of the task
|
description: The Celery job ID of the task
|
||||||
|
@ -2924,11 +2924,11 @@ class API_VM_Device(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The raw Libvirt XML definition of the device to attach
|
description: The raw Libvirt XML definition of the device to attach
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -2978,11 +2978,11 @@ class API_VM_Device(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The raw Libvirt XML definition of the device to detach
|
description: The raw Libvirt XML definition of the device to detach
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -3234,11 +3234,11 @@ class API_VM_Snapshot(Resource):
|
||||||
required: false
|
required: false
|
||||||
description: A custom name for the snapshot instead of autogeneration by date
|
description: A custom name for the snapshot instead of autogeneration by date
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Execution error
|
description: Execution error
|
||||||
schema:
|
schema:
|
||||||
|
@ -3292,11 +3292,11 @@ class API_VM_Snapshot(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The name of the snapshot to remove
|
description: The name of the snapshot to remove
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Execution error
|
description: Execution error
|
||||||
schema:
|
schema:
|
||||||
|
@ -3356,11 +3356,11 @@ class API_VM_Snapshot_Rollback(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The name of the snapshot to roll back to
|
description: The name of the snapshot to roll back to
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Execution error
|
description: Execution error
|
||||||
schema:
|
schema:
|
||||||
|
@ -3436,15 +3436,15 @@ class API_VM_Snapshot_Export(Resource):
|
||||||
description: The absolute file path to export the snapshot to on the active primary coordinator
|
description: The absolute file path to export the snapshot to on the active primary coordinator
|
||||||
- in: query
|
- in: query
|
||||||
name: incremental_parent
|
name: incremental_parent
|
||||||
type: boolean
|
type: string
|
||||||
required: false
|
required: false
|
||||||
description: A snapshot name to generate an incremental diff from
|
description: A snapshot name to generate an incremental diff from
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Execution error
|
description: Execution error
|
||||||
schema:
|
schema:
|
||||||
|
@ -3529,11 +3529,11 @@ class API_VM_Snapshot_Import(Resource):
|
||||||
default: true
|
default: true
|
||||||
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
|
description: Whether or not to retain the (parent, if incremental) volume snapshot after restore
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Execution error
|
description: Execution error
|
||||||
schema:
|
schema:
|
||||||
|
@ -3572,6 +3572,220 @@ class API_VM_Snapshot_Import(Resource):
|
||||||
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
|
api.add_resource(API_VM_Snapshot_Import, "/vm/<vm>/snapshot/import")
|
||||||
|
|
||||||
|
|
||||||
|
# /vm/<vm>/snapshot/send
|
||||||
|
class API_VM_Snapshot_Send(Resource):
|
||||||
|
@RequestParser(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "snapshot_name",
|
||||||
|
"required": True,
|
||||||
|
"helptext": "A snapshot name must be specified",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "destination_api_uri",
|
||||||
|
"required": True,
|
||||||
|
"helptext": "A destination API URI must be specified",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "destination_api_key",
|
||||||
|
"required": True,
|
||||||
|
"helptext": "A destination API key must be specified",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "destination_api_verify_ssl",
|
||||||
|
"required": False,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "incremental_parent",
|
||||||
|
"required": False,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "destination_storage_pool",
|
||||||
|
"required": False,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@Authenticator
|
||||||
|
def post(self, vm, reqargs):
|
||||||
|
"""
|
||||||
|
Send a snapshot of a VM's disks and configuration to another PVC cluster
|
||||||
|
---
|
||||||
|
tags:
|
||||||
|
- vm
|
||||||
|
parameters:
|
||||||
|
- in: query
|
||||||
|
name: snapshot_name
|
||||||
|
type: string
|
||||||
|
required: true
|
||||||
|
description: The name of the snapshot to export (must exist)
|
||||||
|
- in: query
|
||||||
|
name: destination_api_uri
|
||||||
|
type: string
|
||||||
|
required: true
|
||||||
|
description: The base API URI of the destination PVC cluster (with prefix if applicable)
|
||||||
|
- in: query
|
||||||
|
name: destination_api_key
|
||||||
|
type: string
|
||||||
|
required: true
|
||||||
|
description: The API authentication key of the destination PVC cluster
|
||||||
|
- in: query
|
||||||
|
name: destination_api_verify_ssl
|
||||||
|
type: boolean
|
||||||
|
required: false
|
||||||
|
default: true
|
||||||
|
description: Whether or not to validate SSL certificates for an SSL-enabled destination API
|
||||||
|
- in: query
|
||||||
|
name: incremental_parent
|
||||||
|
type: string
|
||||||
|
required: false
|
||||||
|
description: A snapshot name to generate an incremental diff from; incremental send only if unset
|
||||||
|
- in: query
|
||||||
|
name: destination_storage_pool
|
||||||
|
type: string
|
||||||
|
required: false
|
||||||
|
default: source storage pool name
|
||||||
|
description: The remote cluster storage pool to create RBD volumes in, if different from the source storage pool
|
||||||
|
responses:
|
||||||
|
202:
|
||||||
|
description: Accepted
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
description: The Celery job ID of the task
|
||||||
|
400:
|
||||||
|
description: Execution error
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
id: Message
|
||||||
|
404:
|
||||||
|
description: Not found
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
id: Message
|
||||||
|
"""
|
||||||
|
snapshot_name = reqargs.get("snapshot_name", None)
|
||||||
|
destination_api_uri = reqargs.get("destination_api_uri", None)
|
||||||
|
destination_api_key = reqargs.get("destination_api_key", None)
|
||||||
|
destination_api_verify_ssl = vool(strtobool(reqargs.get("destination_api_verify_ssl", "true")))
|
||||||
|
incremental_parent = reqargs.get("incremental_parent", None)
|
||||||
|
destination_storage_pool = reqargs.get("destination_storage_pool", None)
|
||||||
|
|
||||||
|
task = run_celery_task(
|
||||||
|
"vm.send_snapshot",
|
||||||
|
domain=vm,
|
||||||
|
snapshot_name=snapshot_name,
|
||||||
|
destination_api_uri=destination_api_uri,
|
||||||
|
destination_api_key=destination_api_key,
|
||||||
|
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||||
|
incremental_parent=incremental_parent,
|
||||||
|
destination_storage_pool=destination_storage_pool,
|
||||||
|
run_on="primary",
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
{
|
||||||
|
"task_id": task.id,
|
||||||
|
"task_name": "vm.send_snapshot",
|
||||||
|
"run_on": f"{get_primary_node()} (primary)",
|
||||||
|
},
|
||||||
|
202,
|
||||||
|
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
api.add_resource(API_VM_Snapshot_Send, "/vm/<vm>/snapshot/send")
|
||||||
|
|
||||||
|
|
||||||
|
# /vm/<vm>/snapshot/receive/block
|
||||||
|
class API_VM_Snapshot_Receive_Block(Resource):
|
||||||
|
@RequestParser(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "pool",
|
||||||
|
"required": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "volume",
|
||||||
|
"required": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "snapshot",
|
||||||
|
"required": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "size",
|
||||||
|
"required": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "source_snapshot",
|
||||||
|
"required": False,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@Authenticator
|
||||||
|
def post(self, vm, reqargs):
|
||||||
|
"""
|
||||||
|
Receive a snapshot of a single RBD volume from another PVC cluster; may be full or incremental
|
||||||
|
|
||||||
|
NOTICE: This is an API-internal endpoint used by /vm/<vm>/snapshot/send; it should never be called by a client.
|
||||||
|
---
|
||||||
|
tags:
|
||||||
|
- vm
|
||||||
|
parameters:
|
||||||
|
- in: query
|
||||||
|
name: pool
|
||||||
|
type: string
|
||||||
|
required: true
|
||||||
|
description: The name of the destination Ceph RBD data pool
|
||||||
|
- in: query
|
||||||
|
name: volume
|
||||||
|
type: string
|
||||||
|
required: true
|
||||||
|
description: The name of the destination Ceph RBD volume
|
||||||
|
- in: query
|
||||||
|
name: snapshot
|
||||||
|
type: string
|
||||||
|
required: true
|
||||||
|
description: The name of the destination Ceph RBD volume snapshot
|
||||||
|
- in: query
|
||||||
|
name: size
|
||||||
|
type: integer
|
||||||
|
required: true
|
||||||
|
description: The size in bytes of the Ceph RBD volume
|
||||||
|
- in: query
|
||||||
|
name: source_snapshot
|
||||||
|
type: string
|
||||||
|
required: false
|
||||||
|
description: The name of the destination Ceph RBD volume snapshot parent for incremental transfers
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: OK
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
id: Message
|
||||||
|
400:
|
||||||
|
description: Execution error
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
id: Message
|
||||||
|
404:
|
||||||
|
description: Not found
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
id: Message
|
||||||
|
"""
|
||||||
|
return api_helper.vm_snapshot_receive_block(
|
||||||
|
reqargs.get("pool"),
|
||||||
|
reqargs.get("volume") + "_recv",
|
||||||
|
reqargs.get("snapshot"),
|
||||||
|
int(reqargs.get("size")),
|
||||||
|
flask.request.stream,
|
||||||
|
source_snapshot=reqargs.get("source_snapshot"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
api.add_resource(API_VM_Snapshot_Receive_Block, "/vm/<vm>/snapshot/receive/block")
|
||||||
|
|
||||||
|
|
||||||
# /vm/autobackup
|
# /vm/autobackup
|
||||||
class API_VM_Autobackup_Root(Resource):
|
class API_VM_Autobackup_Root(Resource):
|
||||||
@RequestParser(
|
@RequestParser(
|
||||||
|
@ -3601,14 +3815,11 @@ class API_VM_Autobackup_Root(Resource):
|
||||||
type: string
|
type: string
|
||||||
example: "user@domain.tld"
|
example: "user@domain.tld"
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
properties:
|
description: The Celery job ID of the task
|
||||||
task_id:
|
|
||||||
type: string
|
|
||||||
description: Task ID for the provisioner Celery worker
|
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -5173,11 +5384,16 @@ class API_Storage_Ceph_Benchmark(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The PVC storage pool to benchmark
|
description: The PVC storage pool to benchmark
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
description: The Celery job ID of the benchmark (unused elsewhere)
|
description: The Celery job ID of the task
|
||||||
|
400:
|
||||||
|
description: Bad request
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
id: Message
|
||||||
"""
|
"""
|
||||||
# Verify that the pool is valid
|
# Verify that the pool is valid
|
||||||
_list, code = api_helper.ceph_pool_list(
|
_list, code = api_helper.ceph_pool_list(
|
||||||
|
@ -5301,11 +5517,11 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
|
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -5500,11 +5716,11 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||||
required: false
|
required: false
|
||||||
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
|
description: If set, create this many OSDs on the block device instead of 1; usually 2 or 4 depending on size
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -5615,11 +5831,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||||
required: false
|
required: false
|
||||||
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
|
description: If set, creates an OSD DB LV for the replacement OSD with this explicit size in human units (e.g. 1024M, 20G); if unset, use existing ext_db_size
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -5673,11 +5889,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
|
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
@ -5731,7 +5947,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||||
- in: query
|
- in: query
|
||||||
name: force
|
name: force
|
||||||
type: boolean
|
type: boolean
|
||||||
required: flase
|
required: false
|
||||||
description: Force removal even if some step(s) fail
|
description: Force removal even if some step(s) fail
|
||||||
- in: query
|
- in: query
|
||||||
name: yes-i-really-mean-it
|
name: yes-i-really-mean-it
|
||||||
|
@ -5739,11 +5955,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||||
required: true
|
required: true
|
||||||
description: A confirmation string to ensure that the API consumer really means it
|
description: A confirmation string to ensure that the API consumer really means it
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
id: Message
|
description: The Celery job ID of the task
|
||||||
404:
|
404:
|
||||||
description: Not found
|
description: Not found
|
||||||
schema:
|
schema:
|
||||||
|
@ -6331,7 +6547,7 @@ class API_Storage_Ceph_Volume_Root(Resource):
|
||||||
name: force
|
name: force
|
||||||
type: boolean
|
type: boolean
|
||||||
required: false
|
required: false
|
||||||
default: flase
|
default: false
|
||||||
description: Force action if volume creation would violate 80% full soft cap on the pool
|
description: Force action if volume creation would violate 80% full soft cap on the pool
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
|
@ -6408,7 +6624,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
||||||
name: force
|
name: force
|
||||||
type: boolean
|
type: boolean
|
||||||
required: false
|
required: false
|
||||||
default: flase
|
default: false
|
||||||
description: Force action if volume creation would violate 80% full soft cap on the pool
|
description: Force action if volume creation would violate 80% full soft cap on the pool
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
|
@ -6460,7 +6676,7 @@ class API_Storage_Ceph_Volume_Element(Resource):
|
||||||
name: force
|
name: force
|
||||||
type: boolean
|
type: boolean
|
||||||
required: false
|
required: false
|
||||||
default: flase
|
default: false
|
||||||
description: Force action if new volume size would violate 80% full soft cap on the pool
|
description: Force action if new volume size would violate 80% full soft cap on the pool
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
|
@ -6585,7 +6801,7 @@ class API_Storage_Ceph_Volume_Element_Clone(Resource):
|
||||||
name: force
|
name: force
|
||||||
type: boolean
|
type: boolean
|
||||||
required: false
|
required: false
|
||||||
default: flase
|
default: false
|
||||||
description: Force action if clone volume size would violate 80% full soft cap on the pool
|
description: Force action if clone volume size would violate 80% full soft cap on the pool
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
|
@ -9440,14 +9656,11 @@ class API_Provisioner_Create_Root(Resource):
|
||||||
type: string
|
type: string
|
||||||
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
|
description: Script install() function keywork argument in "arg=data" format; may be specified multiple times to add multiple arguments
|
||||||
responses:
|
responses:
|
||||||
200:
|
202:
|
||||||
description: OK
|
description: Accepted
|
||||||
schema:
|
schema:
|
||||||
type: object
|
type: string
|
||||||
properties:
|
description: The Celery job ID of the task
|
||||||
task_id:
|
|
||||||
type: string
|
|
||||||
description: Task ID for the provisioner Celery worker
|
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
schema:
|
schema:
|
||||||
|
|
|
@ -1280,7 +1280,7 @@ def vm_flush_locks(zkhandler, vm):
|
||||||
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
|
zkhandler, None, None, None, vm, is_fuzzy=False, negate=False
|
||||||
)
|
)
|
||||||
|
|
||||||
if retdata[0].get("state") not in ["stop", "disable"]:
|
if retdata[0].get("state") not in ["stop", "disable", "mirror"]:
|
||||||
return {"message": "VM must be stopped to flush locks"}, 400
|
return {"message": "VM must be stopped to flush locks"}, 400
|
||||||
|
|
||||||
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
|
retflag, retdata = pvc_vm.flush_locks(zkhandler, vm)
|
||||||
|
@ -1294,6 +1294,58 @@ def vm_flush_locks(zkhandler, vm):
|
||||||
return output, retcode
|
return output, retcode
|
||||||
|
|
||||||
|
|
||||||
|
def vm_snapshot_receive_block(pool, volume, snapshot, size, stream, source_snapshot=None):
|
||||||
|
try:
|
||||||
|
import rados
|
||||||
|
import rbd
|
||||||
|
|
||||||
|
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
|
||||||
|
cluster.connect()
|
||||||
|
ioctx = cluster.open_ioctx(pool)
|
||||||
|
|
||||||
|
if not source_snapshot:
|
||||||
|
rbd_inst = rbd.RBD()
|
||||||
|
rbd_inst.create(ioctx, volume, size)
|
||||||
|
|
||||||
|
image = rbd.Image(ioctx, volume)
|
||||||
|
|
||||||
|
last_chunk = 0
|
||||||
|
chunk_size = 1024 * 1024 * 128
|
||||||
|
|
||||||
|
if source_snapshot:
|
||||||
|
# Receiving diff data
|
||||||
|
print(f"Applying diff between {source_snapshot} and {snapshot}")
|
||||||
|
while True:
|
||||||
|
chunk = stream.read(chunk_size)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Extract the offset and length (8 bytes each) and the data
|
||||||
|
offset = int.from_bytes(chunk[:8], 'big')
|
||||||
|
length = int.from_bytes(chunk[8:16], 'big')
|
||||||
|
data = chunk[16:16 + length]
|
||||||
|
image.write(data, offset)
|
||||||
|
|
||||||
|
image.create_snap(snapshot)
|
||||||
|
else:
|
||||||
|
# Receiving full image
|
||||||
|
print(f"Importing full snapshot {snapshot}")
|
||||||
|
while True:
|
||||||
|
chunk = flask.request.stream.read(chunk_size)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
image.write(chunk, last_chunk)
|
||||||
|
last_chunk += len(chunk)
|
||||||
|
|
||||||
|
image.create_snap(snapshot)
|
||||||
|
|
||||||
|
image.close()
|
||||||
|
ioctx.close()
|
||||||
|
cluster.shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
return {"message": f"Failed to import block device: {e}"}, 400
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Network functions
|
# Network functions
|
||||||
#
|
#
|
||||||
|
|
|
@ -206,12 +206,12 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
|
||||||
|
|
||||||
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
|
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
|
||||||
|
|
||||||
vm_states = ["start", "disable"]
|
vm_states = ["start", "disable", "mirror"]
|
||||||
vm_states.extend(
|
vm_states.extend(
|
||||||
[
|
[
|
||||||
state
|
state
|
||||||
for state in data.get("vms", {}).keys()
|
for state in data.get("vms", {}).keys()
|
||||||
if state not in ["total", "start", "disable"]
|
if state not in ["total", "start", "disable", "mirror"]
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -221,7 +221,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
|
||||||
continue
|
continue
|
||||||
if state in ["start"]:
|
if state in ["start"]:
|
||||||
state_colour = ansii["green"]
|
state_colour = ansii["green"]
|
||||||
elif state in ["migrate", "disable", "provision"]:
|
elif state in ["migrate", "disable", "provision", "mirror"]:
|
||||||
state_colour = ansii["blue"]
|
state_colour = ansii["blue"]
|
||||||
elif state in ["stop", "fail"]:
|
elif state in ["stop", "fail"]:
|
||||||
state_colour = ansii["red"]
|
state_colour = ansii["red"]
|
||||||
|
|
|
@ -1760,6 +1760,7 @@ def format_info(config, domain_information, long_output):
|
||||||
"provision": ansiprint.blue(),
|
"provision": ansiprint.blue(),
|
||||||
"restore": ansiprint.blue(),
|
"restore": ansiprint.blue(),
|
||||||
"import": ansiprint.blue(),
|
"import": ansiprint.blue(),
|
||||||
|
"mirror": ansiprint.blue(),
|
||||||
}
|
}
|
||||||
ainformation.append(
|
ainformation.append(
|
||||||
"{}State:{} {}{}{}".format(
|
"{}State:{} {}{}{}".format(
|
||||||
|
|
|
@ -1160,6 +1160,7 @@ def get_resource_metrics(zkhandler):
|
||||||
"fail": 8,
|
"fail": 8,
|
||||||
"import": 9,
|
"import": 9,
|
||||||
"restore": 10,
|
"restore": 10,
|
||||||
|
"mirror": 99,
|
||||||
}
|
}
|
||||||
state = vm["state"]
|
state = vm["state"]
|
||||||
output_lines.append(
|
output_lines.append(
|
||||||
|
|
|
@ -85,6 +85,7 @@ vm_state_combinations = [
|
||||||
"provision",
|
"provision",
|
||||||
"import",
|
"import",
|
||||||
"restore",
|
"restore",
|
||||||
|
"mirror",
|
||||||
]
|
]
|
||||||
ceph_osd_state_combinations = [
|
ceph_osd_state_combinations = [
|
||||||
"up,in",
|
"up,in",
|
||||||
|
|
|
@ -24,6 +24,7 @@ import re
|
||||||
import os.path
|
import os.path
|
||||||
import lxml.objectify
|
import lxml.objectify
|
||||||
import lxml.etree
|
import lxml.etree
|
||||||
|
import subprocess
|
||||||
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
@ -580,7 +581,7 @@ def rename_vm(zkhandler, domain, new_domain):
|
||||||
|
|
||||||
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
||||||
state = zkhandler.read(("domain.state", dom_uuid))
|
state = zkhandler.read(("domain.state", dom_uuid))
|
||||||
if state not in ["stop", "disable"]:
|
if state not in ["stop", "disable", "mirror"]:
|
||||||
return (
|
return (
|
||||||
False,
|
False,
|
||||||
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
|
'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(
|
||||||
|
@ -1125,6 +1126,7 @@ def get_list(
|
||||||
"migrate",
|
"migrate",
|
||||||
"unmigrate",
|
"unmigrate",
|
||||||
"provision",
|
"provision",
|
||||||
|
"mirror",
|
||||||
]
|
]
|
||||||
if state not in valid_states:
|
if state not in valid_states:
|
||||||
return False, 'VM state "{}" is not valid.'.format(state)
|
return False, 'VM state "{}" is not valid.'.format(state)
|
||||||
|
@ -1903,10 +1905,10 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
||||||
|
|
||||||
# Check that the domain is stopped (unless force_unlock is set)
|
# Check that the domain is stopped (unless force_unlock is set)
|
||||||
domain_state = zkhandler.read(("domain.state", dom_uuid))
|
domain_state = zkhandler.read(("domain.state", dom_uuid))
|
||||||
if not force_unlock and domain_state not in ["stop", "disable", "fail"]:
|
if not force_unlock and domain_state not in ["stop", "disable", "fail", "mirror"]:
|
||||||
fail(
|
fail(
|
||||||
celery,
|
celery,
|
||||||
f"VM state {domain_state} not in [stop, disable, fail] and not forcing",
|
f"VM state {domain_state} not in [stop, disable, fail, mirror] and not forcing",
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -2329,7 +2331,7 @@ def vm_worker_rollback_snapshot(zkhandler, celery, domain, snapshot_name):
|
||||||
|
|
||||||
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
# Verify that the VM is in a stopped state; renaming is not supported otherwise
|
||||||
state = zkhandler.read(("domain.state", dom_uuid))
|
state = zkhandler.read(("domain.state", dom_uuid))
|
||||||
if state not in ["stop", "disable"]:
|
if state not in ["stop", "disable", "mirror"]:
|
||||||
fail(
|
fail(
|
||||||
celery,
|
celery,
|
||||||
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
|
f"VM '{domain}' is not stopped or disabled; VMs cannot be rolled back while running",
|
||||||
|
@ -3118,3 +3120,389 @@ def vm_worker_import_snapshot(
|
||||||
current=current_stage,
|
current=current_stage,
|
||||||
total=total_stages,
|
total=total_stages,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def vm_worker_send_snapshot(
|
||||||
|
zkhandler,
|
||||||
|
celery,
|
||||||
|
domain,
|
||||||
|
snapshot_name,
|
||||||
|
destination_api_uri,
|
||||||
|
destination_api_key,
|
||||||
|
destination_api_verify_ssl=True,
|
||||||
|
incremental_parent=None,
|
||||||
|
destination_storage_pool=None,
|
||||||
|
):
|
||||||
|
import requests
|
||||||
|
from packaging.version import parse as parse_version
|
||||||
|
|
||||||
|
current_stage = 0
|
||||||
|
total_stages = 1
|
||||||
|
start(
|
||||||
|
celery,
|
||||||
|
f"Sending snapshot '{snapshot_name}' of VM '{domain}' to remote cluster '{destination_api_uri}'",
|
||||||
|
current=current_stage,
|
||||||
|
total=total_stages,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Validate that VM exists in cluster
|
||||||
|
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||||
|
if not dom_uuid:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Could not find VM '{domain}' in the cluster",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get our side's VM configuration details
|
||||||
|
vm_detail = get_list(zkhandler, limit=dom_uuid, is_fuzzy=False)[1][0]
|
||||||
|
if not isinstance(vm_detail, dict):
|
||||||
|
fail(celery, f"VM listing returned invalid data: {vm_detail}",)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if the snapshot exists
|
||||||
|
if not zkhandler.exists(
|
||||||
|
("domain.snapshots", dom_uuid, "domain_snapshot.name", snapshot_name)
|
||||||
|
):
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if the incremental parent exists
|
||||||
|
if incremental_parent is not None and not zkhandler.exists(
|
||||||
|
("domain.snapshots", dom_uuid, "domain_snapshot.name", incremental_parent)
|
||||||
|
):
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Could not find snapshot '{snapshot_name}' of VM '{domain}'",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Validate that the destination cluster can be reached
|
||||||
|
destination_api_timeout = (3.05, 172800)
|
||||||
|
destination_api_headers = {
|
||||||
|
"X-Api-Key": destination_api_key,
|
||||||
|
"Content-Type": "application/octet-stream",
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Hit the API root; this should return "PVC API version x"
|
||||||
|
response = requests.get(
|
||||||
|
f"{destination_api_uri}/",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=None,
|
||||||
|
params=None,
|
||||||
|
data=None,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
if "PVC API" not in response.json().get("message"):
|
||||||
|
raise ValueError("Remote API is not a PVC API or incorrect URI given")
|
||||||
|
except requests.exceptions.ConnectionError as e:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Connection to remote API timed out: {e}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except ValueError as e:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Connection to remote API is not valid: {e}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Connection to remote API failed: {e}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Hit the API "/status" endpoint to validate API key and cluster status
|
||||||
|
response = requests.get(
|
||||||
|
f"{destination_api_uri}/status",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=destination_api_headers,
|
||||||
|
params=None,
|
||||||
|
data=None,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
destination_cluster_status = response.json()
|
||||||
|
current_destination_pvc_version = destination_cluster_status.get(
|
||||||
|
"pvc_version", None
|
||||||
|
)
|
||||||
|
if current_destination_pvc_version is None:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
"Connection to remote API failed: no PVC version information returned",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
expected_destination_pvc_version = "0.9.100" # TODO: 0.9.101 when completed
|
||||||
|
if parse_version(current_destination_pvc_version) < parse_version(
|
||||||
|
expected_destination_pvc_version
|
||||||
|
):
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Remote PVC cluster is too old: requires version {expected_destination_pvc_version} or higher",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if the VM already exists on the remote
|
||||||
|
response = requests.get(
|
||||||
|
f"{destination_api_uri}/vm/{domain}",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=destination_api_headers,
|
||||||
|
params=None,
|
||||||
|
data=None,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
destination_vm_status = response.json()
|
||||||
|
current_destination_vm_state = destination_vm_status.get(
|
||||||
|
"state", None
|
||||||
|
)
|
||||||
|
if current_destination_vm_state is not None and current_destination_vm_state != "mirror":
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Remote PVC VM exists and is not a mirror",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get details about VM snapshot
|
||||||
|
_, snapshot_timestamp, snapshot_xml, snapshot_rbdsnaps = zkhandler.read_many(
|
||||||
|
[
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"domain.snapshots",
|
||||||
|
dom_uuid,
|
||||||
|
"domain_snapshot.name",
|
||||||
|
snapshot_name,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"domain.snapshots",
|
||||||
|
dom_uuid,
|
||||||
|
"domain_snapshot.timestamp",
|
||||||
|
snapshot_name,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"domain.snapshots",
|
||||||
|
dom_uuid,
|
||||||
|
"domain_snapshot.xml",
|
||||||
|
snapshot_name,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
(
|
||||||
|
(
|
||||||
|
"domain.snapshots",
|
||||||
|
dom_uuid,
|
||||||
|
"domain_snapshot.rbd_snapshots",
|
||||||
|
snapshot_name,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
snapshot_rbdsnaps = snapshot_rbdsnaps.split(",")
|
||||||
|
|
||||||
|
# Get details about remote VM snapshots
|
||||||
|
destination_vm_snapshots = destination_vm_status.get("snapshots", [])
|
||||||
|
latest_destination_vm_snapshot = destination_vm_snapshots[0]
|
||||||
|
|
||||||
|
# Check if this snapshot is in the remote list already
|
||||||
|
if snapshot_name in [s['name'] for s in destination_vm_snapshots]:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Snapshot {snapshot_name} already exists on the target",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if this snapshot is older than the latest remote VM snapshot
|
||||||
|
if snapshot_timestamp < latest_destination_vm_snapshot["timestamp"]:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Target has a newer snapshot ({latest_destination_vm_snapshot['name']}); cannot send old snapshot {snapshot_name}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check that our incremental parent exists on the remote VM
|
||||||
|
if incremental_parent is not None:
|
||||||
|
if increment_parent not in [s['name'] for s in destination_vm_snapshots]:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Can not send incremental for a snapshot ({incremental_parent}) which does not exist on the target",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Set send type
|
||||||
|
send_type = "incremental" if incremental_parent is not None else "full"
|
||||||
|
|
||||||
|
# Begin send, set stages
|
||||||
|
total_stages = 2 + (2 * len(snapshot_rbdsnaps)) + (len(snapshot_rbdsnaps) if current_destination_vm_state is None else 0)
|
||||||
|
|
||||||
|
# Create the block devices on the remote side if this is a new VM send
|
||||||
|
for rbd in [r for r in vm_detail["disks"] if r['type'] == 'rbd']:
|
||||||
|
pool, volume = rbd["name"].split('/')
|
||||||
|
|
||||||
|
current_stage += 1
|
||||||
|
update(
|
||||||
|
celery,
|
||||||
|
f"Preparing remote volume for {rbd}@{snapshot_name}",
|
||||||
|
current=current_stage,
|
||||||
|
total=total_stages,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the storage volume details
|
||||||
|
retcode, retdata = ceph.get_list_volume(zkhandler, pool, volume, is_fuzzy=False)
|
||||||
|
if not retcode or len(retdata) != 1:
|
||||||
|
if len(retdata) < 1:
|
||||||
|
error_message = f"No detail returned for volume {rbd}"
|
||||||
|
elif len(retdata) > 1:
|
||||||
|
error_message = f"Multiple details returned for volume {rbd}"
|
||||||
|
else:
|
||||||
|
error_message = f"Error getting details for volume {rbd}"
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
error_message,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
size_bytes = ceph.format_bytes_fromhuman(retdata[0]["stats"]["size"])
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f"Failed to get volume size for {rbd}: {e}"
|
||||||
|
|
||||||
|
if destination_storage_pool is not None:
|
||||||
|
pool = destination_storage_pool
|
||||||
|
|
||||||
|
if current_destination_vm_state is None:
|
||||||
|
current_stage += 1
|
||||||
|
update(
|
||||||
|
celery,
|
||||||
|
f"Creating remote volume {pool}/{volume} for {rbd}@{snapshot_name}",
|
||||||
|
current=current_stage,
|
||||||
|
total=total_stages,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if the volume exists on the target
|
||||||
|
response = requests.get(
|
||||||
|
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=destination_api_headers,
|
||||||
|
params=None,
|
||||||
|
data=None,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
if response.status_code != 404:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Remote storage pool {pool} already contains volume {volume}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create the volume on the target
|
||||||
|
params = {
|
||||||
|
"size": size_bytes,
|
||||||
|
}
|
||||||
|
response = requests.post(
|
||||||
|
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=destination_api_headers,
|
||||||
|
params=params,
|
||||||
|
data=None,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
destination_volume_create_status = response.json()
|
||||||
|
if response.status_code != 200:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Failed to create volume {pool}/{volume} on target: {destination_volume_create_status['message']}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Send the volume to the remote
|
||||||
|
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
|
||||||
|
cluster.connect()
|
||||||
|
ioctx = cluster.open_ioctx(pool)
|
||||||
|
image = rbd.Image(ioctx, name=volume, snapshot=snapshot, read_only=True)
|
||||||
|
size = image.size()
|
||||||
|
chunk_size_mb = 128
|
||||||
|
|
||||||
|
if incremental_parent is not None:
|
||||||
|
# Diff between incremental_parent and snapshot
|
||||||
|
def diff_chunker():
|
||||||
|
def diff_cb(offset, length, exists):
|
||||||
|
""" Callback to handle diff regions """
|
||||||
|
if exists:
|
||||||
|
data = image.read(offset, length)
|
||||||
|
yield (offset.to_bytes(8, 'big') + length.to_bytes(8, 'big') + data)
|
||||||
|
|
||||||
|
image.set_snap(incremental_parent)
|
||||||
|
image.diff_iterate(0, size, source_snapshot, diff_cb)
|
||||||
|
data_stream = diff_chunker()
|
||||||
|
celery_message = f"Sending diff between {incremental_parent} and {snapshot_name} for {rbd}")
|
||||||
|
else:
|
||||||
|
# Full image transfer
|
||||||
|
def chunker():
|
||||||
|
d_start = time.time()
|
||||||
|
chunk_size = 1024 * 1024 * chunk_size_mb
|
||||||
|
current_chunk = 0
|
||||||
|
while current_chunk < size:
|
||||||
|
t_end = time.time()
|
||||||
|
t_tot = t_end - t_start
|
||||||
|
chunk = image.read(current_chunk, chunk_size)
|
||||||
|
yield chunk
|
||||||
|
current_chunk += chunk_size
|
||||||
|
data_stream = chunker()
|
||||||
|
celery_message = f"Sending full image of {rbd}@{snapshot_name}"
|
||||||
|
|
||||||
|
current_stage += 1
|
||||||
|
update(
|
||||||
|
celery,
|
||||||
|
f"Sending volume {rbd}@{snapshot_name} to target ({send_type})",
|
||||||
|
current=current_stage,
|
||||||
|
total=total_stages,
|
||||||
|
)
|
||||||
|
|
||||||
|
send_params = {
|
||||||
|
'pool': pool,
|
||||||
|
'volume': volume,
|
||||||
|
'snapshot': snapshot_name,
|
||||||
|
'size': size,
|
||||||
|
'source_snapshot': incremental_parent,
|
||||||
|
}
|
||||||
|
send_headers = {
|
||||||
|
'X-Api-Key': destination_api_key,,
|
||||||
|
'Content-Type': 'application/octet-stream',
|
||||||
|
'Transfer-Encoding': None # Disable chunked transfer encoding
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
f"{destination_api_uri}/storage/ceph/volume/{pool}/{volume}",
|
||||||
|
timeout=destination_api_timeout,
|
||||||
|
headers=send_headers,
|
||||||
|
params=send_params,
|
||||||
|
data=data_stream,
|
||||||
|
verify=destination_api_verify_ssl,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
except Exception as e:
|
||||||
|
fail(
|
||||||
|
celery,
|
||||||
|
f"Failed to send snapshot: {e}",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
image.close()
|
||||||
|
ioctx.close()
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
# Send the VM configuration
|
||||||
|
if current_destination_vm_state is None:
|
||||||
|
# This is a new VM, so define it
|
||||||
|
#response = requests.post()
|
||||||
|
else:
|
||||||
|
# This is a modification
|
||||||
|
#response = requests.post()
|
||||||
|
|
|
@ -32,7 +32,7 @@ Description: Parallel Virtual Cluster worker daemon
|
||||||
|
|
||||||
Package: pvc-daemon-api
|
Package: pvc-daemon-api
|
||||||
Architecture: all
|
Architecture: all
|
||||||
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
|
Depends: systemd, pvc-daemon-common, gunicorn, python3-gunicorn, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
|
||||||
Description: Parallel Virtual Cluster API daemon
|
Description: Parallel Virtual Cluster API daemon
|
||||||
A KVM/Zookeeper/Ceph-based VM and private cloud manager
|
A KVM/Zookeeper/Ceph-based VM and private cloud manager
|
||||||
.
|
.
|
||||||
|
|
|
@ -33,6 +33,7 @@ from daemon_lib.vm import (
|
||||||
vm_worker_rollback_snapshot,
|
vm_worker_rollback_snapshot,
|
||||||
vm_worker_export_snapshot,
|
vm_worker_export_snapshot,
|
||||||
vm_worker_import_snapshot,
|
vm_worker_import_snapshot,
|
||||||
|
vm_worker_send_snapshot,
|
||||||
)
|
)
|
||||||
from daemon_lib.ceph import (
|
from daemon_lib.ceph import (
|
||||||
osd_worker_add_osd,
|
osd_worker_add_osd,
|
||||||
|
@ -227,6 +228,39 @@ def vm_import_snapshot(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="vm.send_snapshot", bind=True, routing_key="run_on")
|
||||||
|
def vm_send_snapshot(
|
||||||
|
self,
|
||||||
|
domain=None,
|
||||||
|
snapshot_name=None,
|
||||||
|
destination_api_uri="",
|
||||||
|
destination_api_key="",
|
||||||
|
destination_api_verify_ssl=True,
|
||||||
|
incremental_parent=None,
|
||||||
|
destination_storage_pool=None,
|
||||||
|
run_on="primary",
|
||||||
|
):
|
||||||
|
@ZKConnection(config)
|
||||||
|
def run_vm_send_snapshot(
|
||||||
|
zkhandler, self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=True, incremental_parent=None, destination_storage_pool=None,
|
||||||
|
):
|
||||||
|
return vm_worker_send_snapshot(
|
||||||
|
zkhandler,
|
||||||
|
self,
|
||||||
|
domain,
|
||||||
|
snapshot_name,
|
||||||
|
destination_api_uri,
|
||||||
|
destination_api_key,
|
||||||
|
destination_api_verify_ssl=destination_api_verify_ssl,
|
||||||
|
incremental_parent=incremental_parent,
|
||||||
|
destination_storage_pool=destination_storage_pool,
|
||||||
|
)
|
||||||
|
|
||||||
|
return run_vm_send_snapshot(
|
||||||
|
self, domain, snapshot_name, destination_api_uri, destination_api_key, destination_api_verify_ssl=destination_api_verify_ssl, incremental_parent=incremental_parent, destination_storage_pool=destination_storage_pool
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
||||||
def osd_add(
|
def osd_add(
|
||||||
self,
|
self,
|
||||||
|
|
Loading…
Reference in New Issue