From 102c3c3106b027d24383542c605c8793e627e7ed Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 30 Nov 2023 02:01:22 -0500 Subject: [PATCH] Port all Celery worker functions to discrete pkg Moves all tasks run by the Celery worker into a discrete package/module for easier installation. Also adjusts several parameters throughout to accomplish this. --- api-daemon/pvcapid/flaskapi.py | 17 +- api-daemon/pvcworkerd.service | 16 -- build-and-deploy.sh | 4 +- build-unstable-deb.sh | 4 +- bump-version | 3 +- .../pvcapid => daemon-common}/benchmark.py | 38 +-- .../pvcapid => daemon-common}/vmbuilder.py | 12 +- debian/control | 12 +- debian/pvc-daemon-api.install | 2 - debian/pvc-daemon-worker.install | 4 + debian/pvc-daemon-worker.postinst | 14 ++ debian/pvc-daemon-worker.preinst | 5 + debian/pvc-daemon-worker.prerm | 4 + gen-api-doc | 2 +- worker-daemon/daemon_lib | 1 + worker-daemon/pvcworkerd.py | 24 ++ worker-daemon/pvcworkerd.service | 20 ++ {api-daemon => worker-daemon}/pvcworkerd.sh | 4 +- worker-daemon/pvcworkerd/Daemon.py | 237 ++++++++++++++++++ worker-daemon/pvcworkerd/__init__.py | 0 20 files changed, 352 insertions(+), 71 deletions(-) delete mode 100644 api-daemon/pvcworkerd.service rename {api-daemon/pvcapid => daemon-common}/benchmark.py (94%) mode change 100755 => 100644 rename {api-daemon/pvcapid => daemon-common}/vmbuilder.py (99%) mode change 100755 => 100644 create mode 100644 debian/pvc-daemon-worker.install create mode 100644 debian/pvc-daemon-worker.postinst create mode 100644 debian/pvc-daemon-worker.preinst create mode 100644 debian/pvc-daemon-worker.prerm create mode 120000 worker-daemon/daemon_lib create mode 100755 worker-daemon/pvcworkerd.py create mode 100644 worker-daemon/pvcworkerd.service rename {api-daemon => worker-daemon}/pvcworkerd.sh (78%) create mode 100755 worker-daemon/pvcworkerd/Daemon.py create mode 100644 worker-daemon/pvcworkerd/__init__.py diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index dfca2340..014ebbf5 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -31,25 +31,12 @@ from uuid import uuid4 from daemon_lib.common import getPrimaryNode from daemon_lib.zkhandler import ZKConnection from daemon_lib.node import get_list as get_node_list -from daemon_lib.vm import ( - vm_worker_flush_locks, - vm_worker_attach_device, - vm_worker_detach_device, -) -from daemon_lib.ceph import ( - osd_worker_add_osd, - osd_worker_replace_osd, - osd_worker_refresh_osd, - osd_worker_remove_osd, - osd_worker_add_db_vg, -) +from daemon_lib.benchmark import list_benchmarks from pvcapid.Daemon import config, strtobool, API_VERSION import pvcapid.helper as api_helper import pvcapid.provisioner as api_provisioner -import pvcapid.vmbuilder as api_vmbuilder -import pvcapid.benchmark as api_benchmark import pvcapid.ova as api_ova from flask_sqlalchemy import SQLAlchemy @@ -4191,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource): type: string (integer) description: The number of minor page faults during the test """ - return api_benchmark.list_benchmarks(reqargs.get("job", None)) + return list_benchmarks(config, reqargs.get("job", None)) @RequestParser( [ diff --git a/api-daemon/pvcworkerd.service b/api-daemon/pvcworkerd.service deleted file mode 100644 index 71127e12..00000000 --- a/api-daemon/pvcworkerd.service +++ /dev/null @@ -1,16 +0,0 @@ -# Parallel Virtual Cluster Celery Worker daemon unit file - -[Unit] -Description = Parallel Virtual Cluster Celery Worker daemon -After = network-online.target - -[Service] -Type = simple -WorkingDirectory = /usr/share/pvc -Environment = PYTHONUNBUFFERED=true -Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf -ExecStart = /usr/share/pvc/pvcworkerd.sh -Restart = on-failure - -[Install] -WantedBy = multi-user.target diff --git a/build-and-deploy.sh b/build-and-deploy.sh index 31c3a16c..d88598c4 100755 --- a/build-and-deploy.sh +++ b/build-and-deploy.sh @@ -42,7 +42,7 @@ echo " done. Package version ${version}." # Install the client(s) locally echo -n "Installing client packages locally..." -$SUDO dpkg -i ../pvc-client*_${version}*.deb &>/dev/null +$SUDO dpkg -i --force-all ../pvc-client*_${version}*.deb &>/dev/null echo " done". for HOST in ${HOSTS[@]}; do @@ -59,7 +59,7 @@ fi for HOST in ${HOSTS[@]}; do echo "> Deploying packages to host ${HOST}" echo -n "Installing packages..." - ssh $HOST $SUDO dpkg -i /tmp/pvc/*.deb &>/dev/null + ssh $HOST $SUDO dpkg -i --force-all /tmp/pvc/*.deb &>/dev/null ssh $HOST rm -rf /tmp/pvc &>/dev/null echo " done." echo -n "Restarting PVC daemons..." diff --git a/build-unstable-deb.sh b/build-unstable-deb.sh index 146cf5fd..0ba1fb2f 100755 --- a/build-unstable-deb.sh +++ b/build-unstable-deb.sh @@ -14,9 +14,10 @@ tmpdir=$( mktemp -d ) cp -a debian/changelog client-cli/setup.py ${tmpdir}/ cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py +cp -a worker-daemon/pvcworkerd/Daemon.py ${tmpdir}/worker-Daemon.py cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py # Replace the "base" version with the git revision version -sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py +sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog cat < debian/changelog pvc (${new_ver}) unstable; urgency=medium @@ -35,6 +36,7 @@ cp -a ${tmpdir}/changelog debian/changelog cp -a ${tmpdir}/setup.py client-cli/setup.py cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py +cp -a ${tmpdir}/worker-Daemon.py worker-daemon/pvcworkerd/Daemon.py cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py # Clean up diff --git a/bump-version b/bump-version index 6225bf40..68b90838 100755 --- a/bump-version +++ b/bump-version @@ -21,6 +21,7 @@ rm ${changelog_file} sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvchealthd/Daemon.py +sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcworkerd/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py echo ${new_version} > .version @@ -47,7 +48,7 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file} echo -e "${deb_changelog_orig}" >> ${deb_changelog_file} mv ${deb_changelog_file} debian/changelog -git add node-daemon/pvcnoded/Daemon.py headlth-daemon/pvchealthd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version +git add node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version git commit -v popd &>/dev/null diff --git a/api-daemon/pvcapid/benchmark.py b/daemon-common/benchmark.py old mode 100755 new mode 100644 similarity index 94% rename from api-daemon/pvcapid/benchmark.py rename to daemon-common/benchmark.py index 7c617e77..3c2a5e9a --- a/api-daemon/pvcapid/benchmark.py +++ b/daemon-common/benchmark.py @@ -25,9 +25,6 @@ import psycopg2.extras from datetime import datetime from json import loads, dumps -from pvcapid.Daemon import config - -from daemon_lib.zkhandler import ZKHandler from daemon_lib.celery import start, fail, log_info, update, finish import daemon_lib.common as pvc_common @@ -137,7 +134,7 @@ def open_database(config): conn = psycopg2.connect( host=config["api_postgresql_host"], port=config["api_postgresql_port"], - dbname=config["api_postgresql_name"], + dbname=config["api_postgresql_dbname"], user=config["api_postgresql_user"], password=config["api_postgresql_password"], ) @@ -152,7 +149,7 @@ def close_database(conn, cur, failed=False): conn.close() -def list_benchmarks(job=None): +def list_benchmarks(config, job=None): if job is not None: query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks") args = (job,) @@ -278,17 +275,8 @@ def run_benchmark_job( return jstdout -def run_benchmark(self, pool): +def worker_run_benchmark(zkhandler, celery, config, pool): # Phase 0 - connect to databases - try: - zkhandler = ZKHandler(config) - zkhandler.connect() - except Exception: - fail( - self, - "Failed to connect to Zookeeper", - ) - cur_time = datetime.now().isoformat(timespec="seconds") cur_primary = zkhandler.read("base.config.primary_node") job_name = f"{cur_time}_{cur_primary}" @@ -296,7 +284,7 @@ def run_benchmark(self, pool): current_stage = 0 total_stages = 13 start( - self, + celery, f"Running storage benchmark '{job_name}' on pool '{pool}'", current=current_stage, total=total_stages, @@ -312,13 +300,13 @@ def run_benchmark(self, pool): zkhandler=zkhandler, ) fail( - self, + celery, "Failed to connect to Postgres", ) current_stage += 1 update( - self, + celery, "Storing running status in database", current=current_stage, total=total_stages, @@ -340,11 +328,11 @@ def run_benchmark(self, pool): db_cur=db_cur, zkhandler=zkhandler, ) - fail(self, f"Failed to store running status: {e}", exception=BenchmarkError) + fail(celery, f"Failed to store running status: {e}", exception=BenchmarkError) current_stage += 1 update( - self, + celery, "Creating benchmark volume", current=current_stage, total=total_stages, @@ -363,7 +351,7 @@ def run_benchmark(self, pool): for test in test_matrix: current_stage += 1 update( - self, + celery, f"Running benchmark job '{test}'", current=current_stage, total=total_stages, @@ -381,7 +369,7 @@ def run_benchmark(self, pool): # Phase 3 - cleanup current_stage += 1 update( - self, + celery, "Cleaning up venchmark volume", current=current_stage, total=total_stages, @@ -397,7 +385,7 @@ def run_benchmark(self, pool): current_stage += 1 update( - self, + celery, "Storing results in database", current=current_stage, total=total_stages, @@ -415,7 +403,7 @@ def run_benchmark(self, pool): db_cur=db_cur, zkhandler=zkhandler, ) - fail(self, f"Failed to store test results: {e}", exception=BenchmarkError) + fail(celery, f"Failed to store test results: {e}", exception=BenchmarkError) cleanup( job_name, @@ -426,7 +414,7 @@ def run_benchmark(self, pool): current_stage += 1 return finish( - self, + celery, f"Storage benchmark {job_name} completed successfully", current=current_stage, total=total_stages, diff --git a/api-daemon/pvcapid/vmbuilder.py b/daemon-common/vmbuilder.py old mode 100755 new mode 100644 similarity index 99% rename from api-daemon/pvcapid/vmbuilder.py rename to daemon-common/vmbuilder.py index 830ef362..34c0e61a --- a/api-daemon/pvcapid/vmbuilder.py +++ b/daemon-common/vmbuilder.py @@ -31,8 +31,6 @@ import uuid from contextlib import contextmanager -from pvcapid.Daemon import config - from daemon_lib.zkhandler import ZKHandler from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish @@ -216,8 +214,14 @@ def open_zk(config): # # Main VM provisioning function - executed by the Celery worker # -def create_vm( - celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[] +def worker_create_vm( + celery, + config, + vm_name, + vm_profile, + define_vm=True, + start_vm=True, + script_run_args=[], ): current_stage = 0 total_stages = 11 diff --git a/debian/control b/debian/control index f642bd2d..193a2721 100644 --- a/debian/control +++ b/debian/control @@ -4,7 +4,7 @@ Priority: optional Maintainer: Joshua Boniface Standards-Version: 3.9.8 Homepage: https://www.boniface.me -X-Python3-Version: >= 3.2 +X-Python3-Version: >= 3.7 Package: pvc-daemon-node Architecture: all @@ -22,9 +22,17 @@ Description: Parallel Virtual Cluster health daemon . This package installs the PVC health monitoring daemon +Package: pvc-daemon-worker +Architecture: all +Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio +Description: Parallel Virtual Cluster worker daemon + A KVM/Zookeeper/Ceph-based VM and private cloud manager + . + This package installs the PVC Celery task worker daemon + Package: pvc-daemon-api Architecture: all -Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio +Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate Description: Parallel Virtual Cluster API daemon A KVM/Zookeeper/Ceph-based VM and private cloud manager . diff --git a/debian/pvc-daemon-api.install b/debian/pvc-daemon-api.install index 16adaa99..7a031484 100644 --- a/debian/pvc-daemon-api.install +++ b/debian/pvc-daemon-api.install @@ -3,7 +3,5 @@ api-daemon/pvcapid-manage*.py usr/share/pvc api-daemon/pvc-api-db-upgrade usr/share/pvc api-daemon/pvcapid usr/share/pvc api-daemon/pvcapid.service lib/systemd/system -api-daemon/pvcworkerd.service lib/systemd/system -api-daemon/pvcworkerd.sh usr/share/pvc api-daemon/provisioner usr/share/pvc api-daemon/migrations usr/share/pvc diff --git a/debian/pvc-daemon-worker.install b/debian/pvc-daemon-worker.install new file mode 100644 index 00000000..974071a2 --- /dev/null +++ b/debian/pvc-daemon-worker.install @@ -0,0 +1,4 @@ +worker-daemon/pvcworkerd.sh usr/share/pvc +worker-daemon/pvcworkerd.py usr/share/pvc +worker-daemon/pvcworkerd usr/share/pvc +worker-daemon/pvcworkerd.service lib/systemd/system diff --git a/debian/pvc-daemon-worker.postinst b/debian/pvc-daemon-worker.postinst new file mode 100644 index 00000000..c5b07411 --- /dev/null +++ b/debian/pvc-daemon-worker.postinst @@ -0,0 +1,14 @@ +#!/bin/sh + +# Reload systemd's view of the units +systemctl daemon-reload + +# Enable the service and target +systemctl enable /lib/systemd/system/pvcworkerd.service + +# Inform administrator of the service restart/startup not occurring automatically +if systemctl is-active --quiet pvcworkerd.service; then + echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been restarted; this is up to the administrator." +else + echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it." +fi diff --git a/debian/pvc-daemon-worker.preinst b/debian/pvc-daemon-worker.preinst new file mode 100644 index 00000000..93b5fce8 --- /dev/null +++ b/debian/pvc-daemon-worker.preinst @@ -0,0 +1,5 @@ +#!/bin/sh + +# Remove any cached CPython directories or files +echo "Cleaning up existing CPython files" +find /usr/share/pvc/pvcworkerd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true diff --git a/debian/pvc-daemon-worker.prerm b/debian/pvc-daemon-worker.prerm new file mode 100644 index 00000000..0f1fe6e1 --- /dev/null +++ b/debian/pvc-daemon-worker.prerm @@ -0,0 +1,4 @@ +#!/bin/sh + +# Disable the services +systemctl disable pvcworkerd.service diff --git a/gen-api-doc b/gen-api-doc index 57199622..c2da5643 100755 --- a/gen-api-doc +++ b/gen-api-doc @@ -8,7 +8,7 @@ import os import sys import json -os.environ['PVC_CONFIG_FILE'] = "./api-daemon/pvcapid.sample.yaml" +os.environ['PVC_CONFIG_FILE'] = "./pvc.sample.conf" sys.path.append('api-daemon') diff --git a/worker-daemon/daemon_lib b/worker-daemon/daemon_lib new file mode 120000 index 00000000..9df2c591 --- /dev/null +++ b/worker-daemon/daemon_lib @@ -0,0 +1 @@ +../daemon-common \ No newline at end of file diff --git a/worker-daemon/pvcworkerd.py b/worker-daemon/pvcworkerd.py new file mode 100755 index 00000000..b3b92ae5 --- /dev/null +++ b/worker-daemon/pvcworkerd.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +# pvcworkerd.py - Health daemon startup stub +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import pvcworkerd.Daemon # noqa: F401 + +pvcworkerd.Daemon.entrypoint() diff --git a/worker-daemon/pvcworkerd.service b/worker-daemon/pvcworkerd.service new file mode 100644 index 00000000..36487e91 --- /dev/null +++ b/worker-daemon/pvcworkerd.service @@ -0,0 +1,20 @@ +# Parallel Virtual Cluster worker daemon unit file + +[Unit] +Description = Parallel Virtual Cluster worker daemon +After = network.target +Wants = network-online.target +PartOf = pvc.target + +[Service] +Type = simple +WorkingDirectory = /usr/share/pvc +Environment = PYTHONUNBUFFERED=true +Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf +ExecStartPre = /bin/sleep 2 +ExecStart = /usr/share/pvc/pvcworkerd.sh +ExecStopPost = /bin/sleep 2 +Restart = on-failure + +[Install] +WantedBy = pvc.target diff --git a/api-daemon/pvcworkerd.sh b/worker-daemon/pvcworkerd.sh similarity index 78% rename from api-daemon/pvcworkerd.sh rename to worker-daemon/pvcworkerd.sh index 8b33606b..377f2ab0 100755 --- a/api-daemon/pvcworkerd.sh +++ b/worker-daemon/pvcworkerd.sh @@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )" # app arguments work in a non-backwards-compatible way with Celery 5. case "$( cat /etc/debian_version )" in 10.*) - CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" + CELERY_ARGS="worker --app pvcworkerd.Daemon.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" ;; *) - CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" + CELERY_ARGS="--app pvcworkerd.Daemon.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" ;; esac diff --git a/worker-daemon/pvcworkerd/Daemon.py b/worker-daemon/pvcworkerd/Daemon.py new file mode 100755 index 00000000..b0666756 --- /dev/null +++ b/worker-daemon/pvcworkerd/Daemon.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +# Daemon.py - PVC Node Worker daemon +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +from celery import Celery + +import daemon_lib.config as cfg + +from daemon_lib.zkhandler import ZKConnection +from daemon_lib.vm import ( + vm_worker_flush_locks, + vm_worker_attach_device, + vm_worker_detach_device, +) +from daemon_lib.ceph import ( + osd_worker_add_osd, + osd_worker_replace_osd, + osd_worker_refresh_osd, + osd_worker_remove_osd, + osd_worker_add_db_vg, +) +from daemon_lib.benchmark import ( + worker_run_benchmark, +) +from daemon_lib.vmbuilder import ( + worker_create_vm, +) + + +version = "0.9.82" + + +config = cfg.get_configuration() +config["daemon_name"] = "pvcworkerd" +config["daemon_version"] = version + + +celery_task_uri = "redis://{}:{}{}".format( + config["keydb_host"], config["keydb_port"], config["keydb_path"] +) +celery = Celery( + "pvcworkerd", + broker=celery_task_uri, + result_backend=celery_task_uri, + result_extended=True, +) + + +# +# Job functions +# +@celery.task(name="provisioner.create", bind=True, routing_key="run_on") +def create_vm( + self, + vm_name=None, + profile_name=None, + define_vm=True, + start_vm=True, + script_run_args=[], + run_on="primary", +): + return worker_create_vm( + self, + config, + vm_name, + profile_name, + define_vm=define_vm, + start_vm=start_vm, + script_run_args=script_run_args, + ) + + +@celery.task(name="storage.benchmark", bind=True, routing_key="run_on") +def storage_benchmark(self, pool=None, run_on="primary"): + @ZKConnection(config) + def run_storage_benchmark(zkhandler, self, pool): + return worker_run_benchmark(zkhandler, self, config, pool) + + return run_storage_benchmark(self, pool) + + +@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on") +def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"): + @ZKConnection(config) + def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False): + return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock) + + return run_vm_flush_locks(self, domain, force_unlock=force_unlock) + + +@celery.task(name="vm.device_attach", bind=True, routing_key="run_on") +def vm_device_attach(self, domain=None, xml=None, run_on=None): + @ZKConnection(config) + def run_vm_device_attach(zkhandler, self, domain, xml): + return vm_worker_attach_device(zkhandler, self, domain, xml) + + return run_vm_device_attach(self, domain, xml) + + +@celery.task(name="vm.device_detach", bind=True, routing_key="run_on") +def vm_device_detach(self, domain=None, xml=None, run_on=None): + @ZKConnection(config) + def run_vm_device_detach(zkhandler, self, domain, xml): + return vm_worker_detach_device(zkhandler, self, domain, xml) + + return run_vm_device_detach(self, domain, xml) + + +@celery.task(name="osd.add", bind=True, routing_key="run_on") +def osd_add( + self, + device=None, + weight=None, + ext_db_ratio=None, + ext_db_size=None, + split_count=None, + run_on=None, +): + @ZKConnection(config) + def run_osd_add( + zkhandler, + self, + run_on, + device, + weight, + ext_db_ratio=None, + ext_db_size=None, + split_count=None, + ): + return osd_worker_add_osd( + zkhandler, + self, + run_on, + device, + weight, + ext_db_ratio, + ext_db_size, + split_count, + ) + + return run_osd_add( + self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count + ) + + +@celery.task(name="osd.replace", bind=True, routing_key="run_on") +def osd_replace( + self, + osd_id=None, + new_device=None, + old_device=None, + weight=None, + ext_db_ratio=None, + ext_db_size=None, + run_on=None, +): + @ZKConnection(config) + def run_osd_replace( + zkhandler, + self, + run_on, + osd_id, + new_device, + old_device=None, + weight=None, + ext_db_ratio=None, + ext_db_size=None, + ): + return osd_worker_replace_osd( + zkhandler, + self, + run_on, + osd_id, + new_device, + old_device, + weight, + ext_db_ratio, + ext_db_size, + ) + + return run_osd_replace( + self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size + ) + + +@celery.task(name="osd.refresh", bind=True, routing_key="run_on") +def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None): + @ZKConnection(config) + def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False): + return osd_worker_refresh_osd( + zkhandler, self, run_on, osd_id, device, ext_db_flag + ) + + return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag) + + +@celery.task(name="osd.remove", bind=True, routing_key="run_on") +def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None): + @ZKConnection(config) + def run_osd_remove( + zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False + ): + return osd_worker_remove_osd( + zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag + ) + + return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag) + + +@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on") +def osd_add_db_vg(self, device=None, run_on=None): + @ZKConnection(config) + def run_osd_add_db_vg(zkhandler, self, run_on, device): + return osd_worker_add_db_vg(zkhandler, self, run_on, device) + + return run_osd_add_db_vg(self, run_on, device) + + +def entrypoint(): + pass diff --git a/worker-daemon/pvcworkerd/__init__.py b/worker-daemon/pvcworkerd/__init__.py new file mode 100644 index 00000000..e69de29b