diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 67fcf0cf..42e0bed6 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -25,10 +25,16 @@ from functools import wraps from flask_restful import Resource, Api, reqparse, abort from celery import Celery from kombu import Queue +from lxml.objectify import fromstring as lxml_fromstring from daemon_lib.common import getPrimaryNode from daemon_lib.zkhandler import ZKConnection from daemon_lib.node import get_list as get_node_list +from daemon_lib.vm import ( + vm_worker_flush_locks, + vm_worker_attach_device, + vm_worker_detach_device, +) from pvcapid.Daemon import config, strtobool, API_VERSION @@ -50,10 +56,8 @@ app.config["CELERY_RESULT_BACKEND"] = "redis://{}:{}{}".format( config["queue_host"], config["queue_port"], config["queue_path"] ) + # Set up Celery queues -app.config["CELERY_DATABASE_ENGINE_OPTIONS"] = {"echo": True} - - @ZKConnection(config) def get_all_nodes(zkhandler): _, all_nodes = get_node_list(zkhandler, None) @@ -206,6 +210,33 @@ def run_benchmark(self, pool): return api_benchmark.run_benchmark(self, pool) +@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on") +def vm_flush_locks(self, domain, force_unlock=False, run_on="primary"): + @ZKConnection(config) + def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False): + return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock) + + return run_vm_flush_locks(self, domain, force_unlock=force_unlock) + + +@celery.task(name="vm.device_attach", bind=True, routing_key="run_on") +def vm_device_attach(self, domain, xml, run_on=None): + @ZKConnection(config) + def run_vm_device_attach(zkhandler, self, domain, xml): + return vm_worker_attach_device(zkhandler, self, domain, xml) + + return run_vm_device_attach(self, domain, xml) + + +@celery.task(name="vm.device_detach", bind=True, routing_key="run_on") +def vm_device_detach(self, domain, xml, run_on=None): + @ZKConnection(config) + def run_vm_device_detach(zkhandler, self, domain, xml): + return vm_worker_detach_device(zkhandler, self, domain, xml) + + return run_vm_device_detach(self, domain, xml) + + ########################################################## # API Root/Authentication ########################################################## @@ -629,6 +660,106 @@ class API_Status(Resource): api.add_resource(API_Status, "/status") +# /tasks +class API_Tasks(Resource): + @Authenticator + def get(self): + """ + Return a list of active Celery worker tasks + --- + tags: + - root + responses: + 200: + description: OK + schema: + type: object + properties: + active: + type: object + description: Celery app.control.inspect active tasks + reserved: + type: object + description: Celery app.control.inspect reserved tasks + scheduled: + type: object + description: Celery app.control.inspect scheduled tasks + """ + queue = celery.control.inspect() + response = { + "scheduled": queue.scheduled(), + "active": queue.active(), + "reserved": queue.reserved(), + } + return response + + +api.add_resource(API_Tasks, "/tasks") + + +# /tasks/ +class API_Tasks_Element(Resource): + @Authenticator + def get(self, task_id): + """ + View status of a Celery worker task {task_id} + --- + tags: + - provisioner + responses: + 200: + description: OK + schema: + type: object + properties: + total: + type: integer + description: Total number of steps + current: + type: integer + description: Current steps completed + state: + type: string + description: Current job state + status: + type: string + description: Status details about job + 404: + description: Not found + schema: + type: object + id: Message + """ + task = celery.AsyncResult(task_id) + if task.state == "PENDING": + response = { + "state": task.state, + "current": 0, + "total": 1, + "status": "Pending job start", + } + elif task.state != "FAILURE": + response = { + "state": task.state, + "current": task.info.get("current", 0), + "total": task.info.get("total", 1), + "status": task.info.get("status", ""), + } + if "result" in task.info: + response["result"] = task.info["result"] + else: + response = { + "state": task.state, + "current": 1, + "total": 1, + "status": str(task.info), + } + return response + + +api.add_resource(API_Tasks_Element, "/tasks/") + + ########################################################## # Client API - Node ########################################################## @@ -2168,18 +2299,25 @@ class API_VM_Locks(Resource): tags: - vm responses: - 200: + 202: description: OK schema: - type: object - id: Message - 400: - description: Bad request - schema: - type: object - id: Message + type: string + description: The Celery job ID of the task """ - return api_helper.vm_flush_locks(vm) + vm_node_detail, retcode = api_helper.vm_node(vm) + if retcode == 200: + vm_node = vm_node_detail["node"] + else: + return vm_node_detail, retcode + + task = vm_flush_locks.delay(vm, run_on=vm_node) + + return ( + {"task_id": task.id, "run_on": vm_node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) api.add_resource(API_VM_Locks, "/vm//locks") @@ -2296,7 +2434,25 @@ class API_VM_Device(Resource): type: object id: Message """ - return api_helper.vm_attach_device(vm, reqargs.get("xml", None)) + try: + xml = reqargs.get("xml", None) + lxml_fromstring(xml) + except Exception: + return {"message": "Specified XML document is not valid"}, 400 + + vm_node_detail, retcode = api_helper.vm_node(vm) + if retcode == 200: + vm_node = vm_node_detail["node"] + else: + return vm_node_detail, retcode + + task = vm_device_attach.delay(vm, xml, run_on=vm_node) + + return ( + {"task_id": task.id, "run_on": vm_node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) @RequestParser( [ @@ -2332,7 +2488,25 @@ class API_VM_Device(Resource): type: object id: Message """ - return api_helper.vm_detach_device(vm, reqargs.get("xml", None)) + try: + xml = reqargs.get("xml", None) + lxml_fromstring(xml) + except Exception: + return {"message": "Specified XML document is not valid"}, 400 + + vm_node_detail, retcode = api_helper.vm_node(vm) + if retcode == 200: + vm_node = vm_node_detail["node"] + else: + return vm_node_detail, retcode + + task = vm_device_detach.delay(vm, xml, run_on=vm_node) + + return ( + {"task_id": task.id, "run_on": vm_node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) api.add_resource(API_VM_Device, "/vm//device") @@ -8161,7 +8335,7 @@ class API_Provisioner_Status_Root(Resource): type: object description: Celery app.control.inspect scheduled tasks """ - queue = celery.control.inspect(timeout=0.1) + queue = celery.control.inspect() response = { "scheduled": queue.scheduled(), "active": queue.active(), diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index 87b55872..064d3093 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -355,6 +355,9 @@ def vm_node(zkhandler, vm): zkhandler, None, None, None, vm, is_fuzzy=False, negate=False ) + if len(retdata) > 0: + retdata = retdata[0] + if retflag: if retdata: retcode = 200 diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index a067c7ec..88222db7 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -1567,12 +1567,25 @@ def cli_vm_unmigrate(domain, wait, force_live): ) @connection_req @click.argument("domain") -def cli_vm_flush_locks(domain): +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) +def cli_vm_flush_locks(domain, wait_flag): """ - Flush stale RBD locks for virtual machine DOMAIN. DOMAIN may be a UUID or name. DOMAIN must be in a stopped state before flushing locks. + Flush stale RBD locks for virtual machine DOMAIN. DOMAIN may be a UUID or name. DOMAIN must be in the stop, disable, or fail state before flushing locks. + + NOTE: This is a task-based command. The "--wait" flag (default) will block and show progress. Specifying the "--no-wait" flag will return immediately with a job ID instead, which can be queried externally later. """ - retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain) + retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag=wait_flag) + + if retcode and wait_flag: + retmsg = wait_for_flush_locks(CLI_CONFIG, retmsg) finish(retcode, retmsg) diff --git a/client-cli/pvc/cli/helpers.py b/client-cli/pvc/cli/helpers.py index efa86c44..583d7743 100644 --- a/client-cli/pvc/cli/helpers.py +++ b/client-cli/pvc/cli/helpers.py @@ -191,6 +191,66 @@ def update_store(store_path, store_data): jdump(store_data, fh, sort_keys=True, indent=4) +def wait_for_flush_locks(CLI_CONFIG, task_detail): + """ + Wait for a flush_locks task to complete + """ + + task_id = task_detail["task_id"] + run_on = task_detail["run_on"] + + echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}") + echo(CLI_CONFIG, "") + + # Wait for the task to start + echo(CLI_CONFIG, "Waiting for task to start...", newline=False) + while True: + sleep(0.25) + task_status = pvc.lib.common.task_status( + CLI_CONFIG, task_id=task_id, is_watching=True + ) + if task_status.get("state") != "PENDING": + break + echo(CLI_CONFIG, ".", newline=False) + echo(CLI_CONFIG, " done.") + echo(CLI_CONFIG, "") + + # Start following the task state, updating progress as we go + total_task = task_status.get("total") + with progressbar(length=total_task, show_eta=False) as bar: + last_task = 0 + maxlen = 0 + while True: + sleep(0.25) + if task_status.get("state") != "RUNNING": + break + if task_status.get("current") > last_task: + current_task = int(task_status.get("current")) + bar.update(current_task - last_task) + last_task = current_task + # The extensive spaces at the end cause this to overwrite longer previous messages + curlen = len(str(task_status.get("status"))) + if curlen > maxlen: + maxlen = curlen + lendiff = maxlen - curlen + overwrite_whitespace = " " * lendiff + echo( + CLI_CONFIG, + " " + task_status.get("status") + overwrite_whitespace, + newline=False, + ) + task_status = pvc.lib.common.task_status( + CLI_CONFIG, task_id=task_id, is_watching=True + ) + if task_status.get("state") == "SUCCESS": + bar.update(total_task - last_task) + + echo(CLI_CONFIG, "") + retdata = task_status.get("state") + ": " + task_status.get("status") + + return retdata + + def wait_for_provisioner(CLI_CONFIG, task_id): """ Wait for a provisioner task to complete diff --git a/client-cli/pvc/lib/common.py b/client-cli/pvc/lib/common.py index 8071884c..a25c4e61 100644 --- a/client-cli/pvc/lib/common.py +++ b/client-cli/pvc/lib/common.py @@ -24,6 +24,7 @@ import math import time import requests import click +from ast import literal_eval from urllib3 import disable_warnings @@ -199,3 +200,64 @@ def call_api( # Return the response object return response + + +def task_status(config, task_id=None, is_watching=False): + """ + Get information about Celery job {task_id}, or all tasks if None + + API endpoint: GET /api/v1/tasks/{task_id} + API arguments: + API schema: {json_data_object} + """ + if task_id is not None: + response = call_api(config, "get", f"/tasks/{task_id}") + else: + response = call_api(config, "get", "/tasks") + + if task_id is not None: + if response.status_code == 200: + retvalue = True + respjson = response.json() + if is_watching: + # Just return the raw JSON to the watching process instead of including value + return respjson + else: + return retvalue, respjson + else: + retvalue = False + retdata = response.json().get("message", "") + else: + retvalue = True + task_data_raw = response.json() + # Format the Celery data into a more useful data structure + task_data = list() + for task_type in ["active", "reserved", "scheduled"]: + try: + type_data = task_data_raw[task_type] + except Exception: + type_data = None + + if not type_data: + type_data = dict() + for task_host in type_data: + for task_job in task_data_raw[task_type][task_host]: + task = dict() + if task_type == "reserved": + task["type"] = "pending" + else: + task["type"] = task_type + task["worker"] = task_host + task["id"] = task_job.get("id") + try: + task["args"] = literal_eval(task_job.get("args")) + except Exception: + task["args"] = task_job.get("args") + try: + task["kwargs"] = literal_eval(task_job.get("kwargs")) + except Exception: + task["kwargs"] = task_job.get("kwargs") + task_data.append(task) + retdata = task_data + + return retvalue, retdata diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index e74c5e30..d3fa3b98 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -152,7 +152,7 @@ def vm_device_attach(config, vm, xml): data = {"xml": xml} response = call_api(config, "post", "/vm/{vm}/device".format(vm=vm), data=data) - if response.status_code == 200: + if response.status_code in [200, 202]: retstatus = True else: retstatus = False @@ -171,7 +171,7 @@ def vm_device_detach(config, vm, xml): data = {"xml": xml} response = call_api(config, "delete", "/vm/{vm}/device".format(vm=vm), data=data) - if response.status_code == 200: + if response.status_code in [200, 202]: retstatus = True else: retstatus = False @@ -415,7 +415,7 @@ def vm_node(config, vm, target_node, action, force=False, wait=False, force_live return retstatus, response.json().get("message", "") -def vm_locks(config, vm): +def vm_locks(config, vm, wait_flag=False): """ Flush RBD locks of (stopped) VM @@ -423,14 +423,23 @@ def vm_locks(config, vm): API arguments: API schema: {"message":"{data}"} """ - response = call_api(config, "post", "/vm/{vm}/locks".format(vm=vm)) + response = call_api(config, "post", f"/vm/{vm}/locks") - if response.status_code == 200: - retstatus = True + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() else: - retstatus = False + retvalue = False + retdata = response.json().get("message", "") - return retstatus, response.json().get("message", "") + return retvalue, retdata def vm_backup(config, vm, backup_path, incremental_parent=None, retain_snapshot=False): diff --git a/daemon-common/celery.py b/daemon-common/celery.py new file mode 100644 index 00000000..a4be1e46 --- /dev/null +++ b/daemon-common/celery.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +# celery.py - PVC client function library, Celery helper fuctions +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2023 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + + +import sys + +from logging import getLogger +from time import sleep + + +class TaskFailure(Exception): + pass + + +def start(celery, msg, current=1, total=2): + logger = getLogger(__name__) + logger.info(f"Starting: {msg}") + celery.update_state( + state="RUNNING", meta={"current": current, "total": total, "status": msg} + ) + sleep(0.5) + + +def fail(celery, msg, current=1, total=2): + logger = getLogger(__name__) + logger.error(msg) + sys.tracebacklimit = 0 + raise TaskFailure(msg) + + +def update(celery, msg, current=1, total=2): + logger = getLogger(__name__) + logger.info(f"Task update: {msg}") + celery.update_state( + state="RUNNING", meta={"current": current, "total": total, "status": msg} + ) + sleep(0.5) + + +def finish(celery, msg, current=2, total=2): + logger = getLogger(__name__) + celery.update_state( + state="RUNNING", + meta={"current": current, "total": total, "status": "Finishing up"}, + ) + sleep(0.25) + logger.info(f"Success: {msg}") + return {"status": msg, "current": current, "total": total} diff --git a/daemon-common/vm.py b/daemon-common/vm.py index fcc0b44b..15b7d4cd 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -30,14 +30,17 @@ from datetime import datetime from distutils.util import strtobool from json import dump as jdump from json import load as jload +from json import loads as jloads +from libvirt import open as lvopen from shutil import rmtree from socket import gethostname from uuid import UUID import daemon_lib.common as common - import daemon_lib.ceph as ceph + from daemon_lib.network import set_sriov_vf_vm, unset_sriov_vf_vm +from daemon_lib.celery import start, update, fail, finish # @@ -340,100 +343,6 @@ def define_vm( ) -def attach_vm_device(zkhandler, domain, device_spec_xml): - # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if not dom_uuid: - return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - - # Verify that the VM is in a stopped state; freeing locks is not safe otherwise - state = zkhandler.read(("domain.state", dom_uuid)) - if state != "start": - return ( - False, - 'ERROR: VM "{}" is not in started state; live-add unneccessary.'.format( - domain - ), - ) - - # Tell the cluster to attach the device - attach_device_string = "attach_device {} {}".format(dom_uuid, device_spec_xml) - zkhandler.write([("base.cmd.domain", attach_device_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock("base.cmd.domain") - with lock: - try: - result = zkhandler.read("base.cmd.domain").split()[0] - if result == "success-attach_device": - message = 'Attached device on VM "{}"'.format(domain) - success = True - else: - message = 'ERROR: Failed to attach device on VM "{}"; check node logs for details.'.format( - domain - ) - success = False - except Exception: - message = "ERROR: Command ignored by node." - success = False - - # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock("base.cmd.domain") - with lock: - time.sleep(0.5) - zkhandler.write([("base.cmd.domain", "")]) - - return success, message - - -def detach_vm_device(zkhandler, domain, device_spec_xml): - # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zkhandler, domain) - if not dom_uuid: - return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - - # Verify that the VM is in a stopped state; freeing locks is not safe otherwise - state = zkhandler.read(("domain.state", dom_uuid)) - if state != "start": - return ( - False, - 'ERROR: VM "{}" is not in started state; live-add unneccessary.'.format( - domain - ), - ) - - # Tell the cluster to detach the device - detach_device_string = "detach_device {} {}".format(dom_uuid, device_spec_xml) - zkhandler.write([("base.cmd.domain", detach_device_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock("base.cmd.domain") - with lock: - try: - result = zkhandler.read("base.cmd.domain").split()[0] - if result == "success-detach_device": - message = 'Attached device on VM "{}"'.format(domain) - success = True - else: - message = 'ERROR: Failed to detach device on VM "{}"; check node logs for details.'.format( - domain - ) - success = False - except Exception: - message = "ERROR: Command ignored by node." - success = False - - # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock("base.cmd.domain") - with lock: - time.sleep(0.5) - zkhandler.write([("base.cmd.domain", "")]) - - return success, message - - def modify_vm_metadata( zkhandler, domain, @@ -1843,3 +1752,163 @@ def restore_vm(zkhandler, domain, backup_path, datestring, retain_snapshot=False ) return True, "\n".join(retlines) + + +# +# Celery worker tasks (must be run on node, outputs log messages to worker) +# +def vm_worker_helper_getdom(tuuid): + lv_conn = None + libvirt_uri = "qemu:///system" + + # Convert (text) UUID into bytes + buuid = UUID(tuuid).bytes + + try: + lv_conn = lvopen(libvirt_uri) + if lv_conn is None: + raise Exception("Failed to open local libvirt connection") + + # Lookup the UUID + dom = lv_conn.lookupByUUID(buuid) + except Exception as e: + print(f"Error: {e}") + dom = None + finally: + if lv_conn is not None: + lv_conn.close() + + return dom + + +def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): + start( + celery, + f"Flushing RBD locks for VM {domain} [forced={force_unlock}]", + current=1, + total=4, + ) + + dom_uuid = getDomainUUID(zkhandler, domain) + + # Check that the domain is stopped (unless force_unlock is set) + domain_state = zkhandler.read(("domain.state", dom_uuid)) + if not force_unlock and domain_state not in ["stop", "disable", "fail"]: + fail( + celery, + f"VM state {domain_state} not in [stop, disable, fail] and not forcing", + ) + return + + # Get the list of RBD images + rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") + + update(celery, f"Obtaining RBD locks for VM {domain}", current=2, total=4) + + # Prepare a list of locks + rbd_locks = list() + for rbd in rbd_list: + # Check if a lock exists + ( + lock_list_retcode, + lock_list_stdout, + lock_list_stderr, + ) = common.run_os_command(f"rbd lock list --format json {rbd}") + + if lock_list_retcode != 0: + fail( + celery, + f"Failed to obtain lock list for volume {rbd}: {lock_list_stderr}", + ) + return + + try: + lock_list = jloads(lock_list_stdout) + except Exception as e: + fail(celery, f"Failed to parse JSON lock list for volume {rbd}: {e}") + return + + if lock_list: + for lock in lock_list: + rbd_locks.append({"rbd": rbd, "lock": lock}) + + update(celery, f"Freeing RBD locks for VM {domain}", current=3, total=4) + + for _lock in rbd_locks: + rbd = _lock["rbd"] + lock = _lock["lock"] + + ( + lock_remove_retcode, + lock_remove_stdout, + lock_remove_stderr, + ) = common.run_os_command( + f"rbd lock remove {rbd} \"{lock['id']}\" \"{lock['locker']}\"" + ) + + if lock_remove_retcode != 0: + fail( + celery, + f"Failed to free RBD lock {lock['id']} on volume {rbd}: {lock_remove_stderr}", + current=3, + total=4, + ) + return + + return finish( + celery, f"Successfully flushed RBD locks for VM {domain}", current=4, total=4 + ) + + +def vm_worker_attach_device(zkhandler, celery, domain, xml_spec): + start(celery, f"Hot-attaching XML device to VM {domain}") + + dom_uuid = getDomainUUID(zkhandler, domain) + + state = zkhandler.read(("domain.state", dom_uuid)) + if state not in ["start"]: + fail( + celery, + f"VM {domain} not in start state; hot-attach unnecessary or impossible", + ) + return + + dom = vm_worker_helper_getdom(dom_uuid) + if dom is None: + fail(celery, f"Failed to find Libvirt object for VM {domain}") + return + + try: + dom.attachDevice(xml_spec) + except Exception as e: + fail(celery, e) + return + + return finish(celery, f"Successfully hot-attached XML device to VM {domain}") + + +def vm_worker_detach_device(zkhandler, celery, domain, xml_spec): + start(celery, f"Hot-detaching XML device from VM {domain}") + + dom_uuid = getDomainUUID(zkhandler, domain) + + state = zkhandler.read(("domain.state", dom_uuid)) + if state not in ["start"]: + fail( + celery, + f"VM {domain} not in start state; hot-detach unnecessary or impossible", + ) + return + + dom = vm_worker_helper_getdom(dom_uuid) + if dom is None: + fail(celery, f"Failed to find Libvirt object for VM {domain}") + return + + try: + dom.detachDevice(xml_spec) + except Exception as e: + fail(celery, e) + return + + return finish(celery, f"Successfully hot-detached XML device from VM {domain}") diff --git a/node-daemon/pvcnoded/objects/VMInstance.py b/node-daemon/pvcnoded/objects/VMInstance.py index bbcb0ca3..0d3e4d4f 100644 --- a/node-daemon/pvcnoded/objects/VMInstance.py +++ b/node-daemon/pvcnoded/objects/VMInstance.py @@ -28,8 +28,6 @@ from threading import Thread from xml.etree import ElementTree -from re import match - import daemon_lib.common as common import pvcnoded.objects.VMConsoleWatcherInstance as VMConsoleWatcherInstance @@ -1058,54 +1056,3 @@ class VMInstance(object): ) return True - - -# Primary command function -def vm_command(zkhandler, logger, this_node, data): - # Get the command and args - command, dom_uuid, *args = data.split() - - if match("success-.*", command) or match("failure-.*", command): - return - - logger.out( - 'Getting command "{}" for domain "{}"'.format(command, dom_uuid), state="i" - ) - - # Verify that the VM is set to run on this node - domain = this_node.d_domain.get(dom_uuid, None) - if domain is None: - return False - - if domain.getnode() != this_node.name: - return - - # Lock the command queue - zk_lock = zkhandler.writelock("base.cmd.domain") - with zk_lock: - # Flushing VM RBD locks - if command == "flush_locks": - result = VMInstance.flush_locks(zkhandler, logger, dom_uuid, this_node) - # Attaching a device - elif command == "attach_device": - xml_spec = " ".join(args) - result = domain.attach_device(xml_spec) - # Detaching a device - elif command == "detach_device": - xml_spec = " ".join(args) - result = domain.detach_device(xml_spec) - # Command not defined - else: - result = False - - # Command succeeded - if result: - # Update the command queue - zkhandler.write([("base.cmd.domain", "success-{}".format(data))]) - # Command failed - else: - # Update the command queue - zkhandler.write([("base.cmd.domain", "failure-{}".format(data))]) - - # Wait 1 seconds before we free the lock, to ensure the client hits the lock - time.sleep(1)