Compare commits
28 Commits
74a416165d
...
v0.9.83
Author | SHA1 | Date | |
---|---|---|---|
988de1218f | |||
0ffcbf3152 | |||
ad8d8cf7a7 | |||
915a84ee3c | |||
6315a068d1 | |||
2afd064445 | |||
7cb9ebae6b | |||
1fb0463dea | |||
13549fc995 | |||
102c3c3106 | |||
0c0fb65c62 | |||
03a738f878 | |||
4df5fdbca6 | |||
97eb63ebab | |||
4a2eba0961 | |||
077dd8708f | |||
b6b5786c3b | |||
ad738dec40 | |||
d2b764a2c7 | |||
b8aecd9c83 | |||
11db3c5b20 | |||
7a7c975eff | |||
fa12a3c9b1 | |||
787f4216b3 | |||
647cba3cf5 | |||
921ecb3a05 | |||
6a68cf665b | |||
41f4e4fb2f |
12
CHANGELOG.md
12
CHANGELOG.md
@ -1,5 +1,17 @@
|
||||
## PVC Changelog
|
||||
|
||||
###### [v0.9.83](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.83)
|
||||
|
||||
**Breaking Changes:** This release features a breaking change for the daemon config. A new unified "pvc.conf" file is required for all daemons (and the CLI client for Autobackup and API-on-this-host functionality), which will be written by the "pvc" role in the PVC Ansible framework. Using the "update-pvc-daemons" oneshot playbook from PVC Ansible is **required** to update to this release, as it will ensure this file is written to the proper place before deploying the new package versions, and also ensures that the old entires are cleaned up afterwards. In addition, this release fully splits the node worker and health subsystems into discrete daemons ("pvcworkerd" and "pvchealthd") and packages ("pvc-daemon-worker" and "pvc-daemon-health") respectively. The "pvc-daemon-node" package also now depends on both packages, and the "pvc-daemon-api" package can now be reliably used outside of the PVC nodes themselves (for instance, in a VM) without any strange cross-dependency issues.
|
||||
|
||||
* [All] Unifies all daemon (and on-node CLI task) configuration into a "pvc.conf" YAML configuration.
|
||||
* [All] Splits the node worker subsystem into a discrete codebase and package ("pvc-daemon-worker"), still named "pvcworkerd".
|
||||
* [All] Splits the node health subsystem into a discrete codebase and package ("pvc-daemon-health"), named "pvchealthd".
|
||||
* [All] Improves Zookeeper node logging to avoid bugs and to support multiple simultaneous daemon writes.
|
||||
* [All] Fixes several bugs in file logging and splits file logs by daemon.
|
||||
* [Node Daemon] Improves several log messages to match new standards from Health daemon.
|
||||
* [API Daemon] Reworks Celery task routing and handling to move all worker tasks to Worker daemon.
|
||||
|
||||
###### [v0.9.82](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.82)
|
||||
|
||||
* [API Daemon] Fixes a bug where the Celery result_backend was not loading properly on Celery <5.2.x (Debian 10/11).
|
||||
|
@ -19,15 +19,15 @@
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from ssl import SSLContext, TLSVersion
|
||||
|
||||
from distutils.util import strtobool as dustrtobool
|
||||
|
||||
import daemon_lib.config as cfg
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.82"
|
||||
version = "0.9.83"
|
||||
|
||||
# API version
|
||||
API_VERSION = 1.0
|
||||
@ -53,160 +53,13 @@ def strtobool(stringv):
|
||||
# Configuration Parsing
|
||||
##########################################################
|
||||
|
||||
# Parse the configuration file
|
||||
config_file = None
|
||||
try:
|
||||
_config_file = "/etc/pvc/pvcapid.yaml"
|
||||
if not os.path.exists(_config_file):
|
||||
raise
|
||||
config_file = _config_file
|
||||
config_type = "legacy"
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
_config_file = os.environ["PVC_CONFIG_FILE"]
|
||||
if not os.path.exists(_config_file):
|
||||
raise
|
||||
config_file = _config_file
|
||||
config_type = "current"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not config_file:
|
||||
print(
|
||||
'Error: The "PVC_CONFIG_FILE" environment variable must be set before starting pvcapid.'
|
||||
)
|
||||
exit(1)
|
||||
# Get our configuration
|
||||
config = cfg.get_configuration()
|
||||
config["daemon_name"] = "pvcapid"
|
||||
config["daemon_version"] = version
|
||||
|
||||
|
||||
def load_configuration_file(config_file):
|
||||
print('Loading configuration from file "{}"'.format(config_file))
|
||||
|
||||
# Read in the config
|
||||
try:
|
||||
with open(config_file, "r") as cfgfile:
|
||||
o_config = yaml.load(cfgfile, Loader=yaml.BaseLoader)
|
||||
except Exception as e:
|
||||
print("ERROR: Failed to parse configuration file: {}".format(e))
|
||||
exit(1)
|
||||
|
||||
return o_config
|
||||
|
||||
|
||||
def get_configuration_current(config_file):
|
||||
o_config = load_configuration_file(config_file)
|
||||
try:
|
||||
# Create the config object
|
||||
config = {
|
||||
"debug": strtobool(o_config["logging"].get("debug_logging", "False")),
|
||||
"all_nodes": o_config["cluster"]["all_nodes"],
|
||||
"coordinators": o_config["cluster"]["coordinator_nodes"],
|
||||
"listen_address": o_config["api"]["listen"]["address"],
|
||||
"listen_port": int(o_config["api"]["listen"]["port"]),
|
||||
"auth_enabled": strtobool(
|
||||
o_config["api"]["authentication"].get("enabled", "False")
|
||||
),
|
||||
"auth_secret_key": o_config["api"]["authentication"]["secret_key"],
|
||||
"auth_source": o_config["api"]["authentication"]["source"],
|
||||
"ssl_enabled": strtobool(o_config["api"]["ssl"].get("enabled", "False")),
|
||||
"ssl_cert_file": o_config["api"]["ssl"]["certificate"],
|
||||
"ssl_key_file": o_config["api"]["ssl"]["private_key"],
|
||||
"database_port": o_config["database"]["postgres"]["port"],
|
||||
"database_host": o_config["database"]["postgres"]["hostname"],
|
||||
"database_name": o_config["database"]["postgres"]["credentials"]["api"][
|
||||
"database"
|
||||
],
|
||||
"database_user": o_config["database"]["postgres"]["credentials"]["api"][
|
||||
"username"
|
||||
],
|
||||
"database_password": o_config["database"]["postgres"]["credentials"]["api"][
|
||||
"password"
|
||||
],
|
||||
"queue_port": o_config["database"]["keydb"]["port"],
|
||||
"queue_host": o_config["database"]["keydb"]["hostname"],
|
||||
"queue_path": o_config["database"]["keydb"]["path"],
|
||||
"storage_domain": o_config["cluster"]["networks"]["storage"]["domain"],
|
||||
"storage_hosts": o_config["ceph"].get("monitor_hosts", None),
|
||||
"ceph_monitor_port": o_config["ceph"]["monitor_port"],
|
||||
"ceph_storage_secret_uuid": o_config["ceph"]["secret_uuid"],
|
||||
}
|
||||
|
||||
# Use coordinators as storage hosts if not explicitly specified
|
||||
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
|
||||
config["storage_hosts"] = config["coordinators"]
|
||||
|
||||
# Set up our token list if specified
|
||||
if config["auth_source"] == "token":
|
||||
config["auth_tokens"] = o_config["api"]["token"]
|
||||
else:
|
||||
if config["auth_enabled"]:
|
||||
print(
|
||||
"WARNING: No authentication method provided; disabling authentication."
|
||||
)
|
||||
config["auth_enabled"] = False
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to load configuration: {e}")
|
||||
exit(1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_configuration_legacy(config_file):
|
||||
o_config = load_configuration_file(config_file)
|
||||
try:
|
||||
# Create the config object
|
||||
config = {
|
||||
"debug": strtobool(o_config["pvc"]["debug"]),
|
||||
"coordinators": o_config["pvc"]["coordinators"],
|
||||
"listen_address": o_config["pvc"]["api"]["listen_address"],
|
||||
"listen_port": int(o_config["pvc"]["api"]["listen_port"]),
|
||||
"auth_enabled": strtobool(
|
||||
o_config["pvc"]["api"]["authentication"]["enabled"]
|
||||
),
|
||||
"auth_secret_key": o_config["pvc"]["api"]["authentication"]["secret_key"],
|
||||
"auth_tokens": o_config["pvc"]["api"]["authentication"]["tokens"],
|
||||
"ssl_enabled": strtobool(o_config["pvc"]["api"]["ssl"]["enabled"]),
|
||||
"ssl_key_file": o_config["pvc"]["api"]["ssl"]["key_file"],
|
||||
"ssl_cert_file": o_config["pvc"]["api"]["ssl"]["cert_file"],
|
||||
"database_host": o_config["pvc"]["provisioner"]["database"]["host"],
|
||||
"database_port": int(o_config["pvc"]["provisioner"]["database"]["port"]),
|
||||
"database_name": o_config["pvc"]["provisioner"]["database"]["name"],
|
||||
"database_user": o_config["pvc"]["provisioner"]["database"]["user"],
|
||||
"database_password": o_config["pvc"]["provisioner"]["database"]["pass"],
|
||||
"queue_host": o_config["pvc"]["provisioner"]["queue"]["host"],
|
||||
"queue_port": o_config["pvc"]["provisioner"]["queue"]["port"],
|
||||
"queue_path": o_config["pvc"]["provisioner"]["queue"]["path"],
|
||||
"storage_hosts": o_config["pvc"]["provisioner"]["ceph_cluster"][
|
||||
"storage_hosts"
|
||||
],
|
||||
"storage_domain": o_config["pvc"]["provisioner"]["ceph_cluster"][
|
||||
"storage_domain"
|
||||
],
|
||||
"ceph_monitor_port": o_config["pvc"]["provisioner"]["ceph_cluster"][
|
||||
"ceph_monitor_port"
|
||||
],
|
||||
"ceph_storage_secret_uuid": o_config["pvc"]["provisioner"]["ceph_cluster"][
|
||||
"ceph_storage_secret_uuid"
|
||||
],
|
||||
}
|
||||
|
||||
# Use coordinators as storage hosts if not explicitly specified
|
||||
if not config["storage_hosts"]:
|
||||
config["storage_hosts"] = config["coordinators"]
|
||||
|
||||
except Exception as e:
|
||||
print("ERROR: Failed to load configuration: {}".format(e))
|
||||
exit(1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
if config_type == "legacy":
|
||||
config = get_configuration_legacy(config_file)
|
||||
else:
|
||||
config = get_configuration_current(config_file)
|
||||
|
||||
##########################################################
|
||||
# Entrypoint
|
||||
##########################################################
|
||||
@ -215,41 +68,43 @@ else:
|
||||
def entrypoint():
|
||||
import pvcapid.flaskapi as pvc_api # noqa: E402
|
||||
|
||||
if config["ssl_enabled"]:
|
||||
if config["api_ssl_enabled"]:
|
||||
context = SSLContext()
|
||||
context.minimum_version = TLSVersion.TLSv1
|
||||
context.get_ca_certs()
|
||||
context.load_cert_chain(config["ssl_cert_file"], keyfile=config["ssl_key_file"])
|
||||
context.load_cert_chain(
|
||||
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
|
||||
)
|
||||
else:
|
||||
context = None
|
||||
|
||||
# Print our startup messages
|
||||
print("")
|
||||
print("|------------------------------------------------------------|")
|
||||
print("| |")
|
||||
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
|
||||
print("| ██ ▜█▙ ▟█▛ ██ |")
|
||||
print("| ███████████ ▜█▙ ▟█▛ ██ |")
|
||||
print("| ██ ▜█▙▟█▛ ███████████ |")
|
||||
print("| |")
|
||||
print("|------------------------------------------------------------|")
|
||||
print("| Parallel Virtual Cluster API daemon v{0: <21} |".format(version))
|
||||
print("| Debug: {0: <51} |".format(str(config["debug"])))
|
||||
print("| API version: v{0: <44} |".format(API_VERSION))
|
||||
print("|--------------------------------------------------------------|")
|
||||
print("| |")
|
||||
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
|
||||
print("| ██ ▜█▙ ▟█▛ ██ |")
|
||||
print("| ███████████ ▜█▙ ▟█▛ ██ |")
|
||||
print("| ██ ▜█▙▟█▛ ███████████ |")
|
||||
print("| |")
|
||||
print("|--------------------------------------------------------------|")
|
||||
print("| Parallel Virtual Cluster API daemon v{0: <23} |".format(version))
|
||||
print("| Debug: {0: <53} |".format(str(config["debug"])))
|
||||
print("| API version: v{0: <46} |".format(API_VERSION))
|
||||
print(
|
||||
"| Listen: {0: <50} |".format(
|
||||
"{}:{}".format(config["listen_address"], config["listen_port"])
|
||||
"| Listen: {0: <52} |".format(
|
||||
"{}:{}".format(config["api_listen_address"], config["api_listen_port"])
|
||||
)
|
||||
)
|
||||
print("| SSL: {0: <53} |".format(str(config["ssl_enabled"])))
|
||||
print("| Authentication: {0: <42} |".format(str(config["auth_enabled"])))
|
||||
print("|------------------------------------------------------------|")
|
||||
print("| SSL: {0: <55} |".format(str(config["api_ssl_enabled"])))
|
||||
print("| Authentication: {0: <44} |".format(str(config["api_auth_enabled"])))
|
||||
print("|--------------------------------------------------------------|")
|
||||
print("")
|
||||
|
||||
pvc_api.celery_startup()
|
||||
pvc_api.app.run(
|
||||
config["listen_address"],
|
||||
config["listen_port"],
|
||||
config["api_listen_address"],
|
||||
config["api_listen_port"],
|
||||
threaded=True,
|
||||
ssl_context=context,
|
||||
)
|
||||
|
@ -31,25 +31,12 @@ from uuid import uuid4
|
||||
from daemon_lib.common import getPrimaryNode
|
||||
from daemon_lib.zkhandler import ZKConnection
|
||||
from daemon_lib.node import get_list as get_node_list
|
||||
from daemon_lib.vm import (
|
||||
vm_worker_flush_locks,
|
||||
vm_worker_attach_device,
|
||||
vm_worker_detach_device,
|
||||
)
|
||||
from daemon_lib.ceph import (
|
||||
osd_worker_add_osd,
|
||||
osd_worker_replace_osd,
|
||||
osd_worker_refresh_osd,
|
||||
osd_worker_remove_osd,
|
||||
osd_worker_add_db_vg,
|
||||
)
|
||||
from daemon_lib.benchmark import list_benchmarks
|
||||
|
||||
from pvcapid.Daemon import config, strtobool, API_VERSION
|
||||
|
||||
import pvcapid.helper as api_helper
|
||||
import pvcapid.provisioner as api_provisioner
|
||||
import pvcapid.vmbuilder as api_vmbuilder
|
||||
import pvcapid.benchmark as api_benchmark
|
||||
import pvcapid.ova as api_ova
|
||||
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
@ -61,11 +48,11 @@ app = flask.Flask(__name__)
|
||||
# Set up SQLAlchemy backend
|
||||
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
|
||||
config["database_user"],
|
||||
config["database_password"],
|
||||
config["database_host"],
|
||||
config["database_port"],
|
||||
config["database_name"],
|
||||
config["api_postgresql_user"],
|
||||
config["api_postgresql_password"],
|
||||
config["api_postgresql_host"],
|
||||
config["api_postgresql_port"],
|
||||
config["api_postgresql_dbname"],
|
||||
)
|
||||
|
||||
if config["debug"]:
|
||||
@ -73,8 +60,8 @@ if config["debug"]:
|
||||
else:
|
||||
app.config["DEBUG"] = False
|
||||
|
||||
if config["auth_enabled"]:
|
||||
app.config["SECRET_KEY"] = config["auth_secret_key"]
|
||||
if config["api_auth_enabled"]:
|
||||
app.config["SECRET_KEY"] = config["api_auth_secret_key"]
|
||||
|
||||
# Create SQLAlchemy database
|
||||
db = SQLAlchemy(app)
|
||||
@ -99,41 +86,34 @@ def get_primary_node(zkhandler):
|
||||
return getPrimaryNode(zkhandler)
|
||||
|
||||
|
||||
# Set up Celery queue routing
|
||||
def route_task(name, args, kwargs, options, task=None, **kw):
|
||||
print("----")
|
||||
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
|
||||
# Set up Celery task ID generator
|
||||
# 1. Lets us make our own IDs (first section of UUID)
|
||||
# 2. Lets us distribute jobs to the required pvcworkerd instances
|
||||
def run_celery_task(task_name, **kwargs):
|
||||
task_id = str(uuid4()).split("-")[0]
|
||||
|
||||
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
|
||||
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
|
||||
run_on = kwargs[options["routing_key"]]
|
||||
if run_on == "primary":
|
||||
run_on = get_primary_node()
|
||||
# Otherwise, use the primary node
|
||||
if "run_on" in kwargs and kwargs["run_on"] != "primary":
|
||||
run_on = kwargs["run_on"]
|
||||
else:
|
||||
run_on = get_primary_node()
|
||||
|
||||
print(f"Selected Celery worker: {run_on}")
|
||||
print("----")
|
||||
|
||||
return run_on
|
||||
|
||||
|
||||
# Set up Celery task ID generator
|
||||
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
|
||||
def run_celery_task(task_def, **kwargs):
|
||||
task_id = str(uuid4()).split("-")[0]
|
||||
task = task_def.apply_async(
|
||||
(),
|
||||
kwargs,
|
||||
task_id=task_id,
|
||||
print(
|
||||
f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}"
|
||||
)
|
||||
|
||||
task = celery.send_task(
|
||||
task_name,
|
||||
task_id=task_id,
|
||||
kwargs=kwargs,
|
||||
queue=run_on,
|
||||
)
|
||||
|
||||
return task
|
||||
|
||||
|
||||
# Create celery definition
|
||||
celery_task_uri = "redis://{}:{}{}".format(
|
||||
config["queue_host"], config["queue_port"], config["queue_path"]
|
||||
config["keydb_host"], config["keydb_port"], config["keydb_path"]
|
||||
)
|
||||
celery = Celery(
|
||||
app.name,
|
||||
@ -153,7 +133,6 @@ def celery_startup():
|
||||
app.config["task_queues"] = tuple(
|
||||
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
|
||||
)
|
||||
app.config["task_routes"] = (route_task,)
|
||||
celery.conf.update(app.config)
|
||||
|
||||
|
||||
@ -199,7 +178,7 @@ def Authenticator(function):
|
||||
@wraps(function)
|
||||
def authenticate(*args, **kwargs):
|
||||
# No authentication required
|
||||
if not config["auth_enabled"]:
|
||||
if not config["api_auth_enabled"]:
|
||||
return function(*args, **kwargs)
|
||||
# Session-based authentication
|
||||
if "token" in flask.session:
|
||||
@ -208,7 +187,7 @@ def Authenticator(function):
|
||||
if "X-Api-Key" in flask.request.headers:
|
||||
if any(
|
||||
token
|
||||
for token in config["auth_tokens"]
|
||||
for token in config["api_auth_tokens"]
|
||||
if flask.request.headers.get("X-Api-Key") == token.get("token")
|
||||
):
|
||||
return function(*args, **kwargs)
|
||||
@ -220,171 +199,6 @@ def Authenticator(function):
|
||||
return authenticate
|
||||
|
||||
|
||||
#
|
||||
# Job functions
|
||||
#
|
||||
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
|
||||
def create_vm(
|
||||
self,
|
||||
vm_name=None,
|
||||
profile_name=None,
|
||||
define_vm=True,
|
||||
start_vm=True,
|
||||
script_run_args=[],
|
||||
run_on="primary",
|
||||
):
|
||||
return api_vmbuilder.create_vm(
|
||||
self,
|
||||
vm_name,
|
||||
profile_name,
|
||||
define_vm=define_vm,
|
||||
start_vm=start_vm,
|
||||
script_run_args=script_run_args,
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
|
||||
def run_benchmark(self, pool=None, run_on="primary"):
|
||||
return api_benchmark.run_benchmark(self, pool)
|
||||
|
||||
|
||||
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
|
||||
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
|
||||
@ZKConnection(config)
|
||||
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
|
||||
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
|
||||
|
||||
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
|
||||
|
||||
|
||||
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
|
||||
def vm_device_attach(self, domain=None, xml=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_vm_device_attach(zkhandler, self, domain, xml):
|
||||
return vm_worker_attach_device(zkhandler, self, domain, xml)
|
||||
|
||||
return run_vm_device_attach(self, domain, xml)
|
||||
|
||||
|
||||
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
|
||||
def vm_device_detach(self, domain=None, xml=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_vm_device_detach(zkhandler, self, domain, xml):
|
||||
return vm_worker_detach_device(zkhandler, self, domain, xml)
|
||||
|
||||
return run_vm_device_detach(self, domain, xml)
|
||||
|
||||
|
||||
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
||||
def osd_add(
|
||||
self,
|
||||
device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
split_count=None,
|
||||
run_on=None,
|
||||
):
|
||||
@ZKConnection(config)
|
||||
def run_osd_add(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
device,
|
||||
weight,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
split_count=None,
|
||||
):
|
||||
return osd_worker_add_osd(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
device,
|
||||
weight,
|
||||
ext_db_ratio,
|
||||
ext_db_size,
|
||||
split_count,
|
||||
)
|
||||
|
||||
return run_osd_add(
|
||||
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
|
||||
def osd_replace(
|
||||
self,
|
||||
osd_id=None,
|
||||
new_device=None,
|
||||
old_device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
run_on=None,
|
||||
):
|
||||
@ZKConnection(config)
|
||||
def run_osd_replace(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
osd_id,
|
||||
new_device,
|
||||
old_device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
):
|
||||
return osd_worker_replace_osd(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
osd_id,
|
||||
new_device,
|
||||
old_device,
|
||||
weight,
|
||||
ext_db_ratio,
|
||||
ext_db_size,
|
||||
)
|
||||
|
||||
return run_osd_replace(
|
||||
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
|
||||
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
|
||||
return osd_worker_refresh_osd(
|
||||
zkhandler, self, run_on, osd_id, device, ext_db_flag
|
||||
)
|
||||
|
||||
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
|
||||
|
||||
|
||||
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
|
||||
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_remove(
|
||||
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
|
||||
):
|
||||
return osd_worker_remove_osd(
|
||||
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
|
||||
)
|
||||
|
||||
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
|
||||
|
||||
|
||||
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
|
||||
def osd_add_db_vg(self, device=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_add_db_vg(zkhandler, self, run_on, device):
|
||||
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
|
||||
|
||||
return run_osd_add_db_vg(self, run_on, device)
|
||||
|
||||
|
||||
##########################################################
|
||||
# API Root/Authentication
|
||||
##########################################################
|
||||
@ -469,12 +283,12 @@ class API_Login(Resource):
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
if not config["auth_enabled"]:
|
||||
if not config["api_auth_enabled"]:
|
||||
return flask.redirect(Api.url_for(api, API_Root))
|
||||
|
||||
if any(
|
||||
token
|
||||
for token in config["auth_tokens"]
|
||||
for token in config["api_auth_tokens"]
|
||||
if flask.request.values["token"] in token["token"]
|
||||
):
|
||||
flask.session["token"] = flask.request.form["token"]
|
||||
@ -503,7 +317,7 @@ class API_Logout(Resource):
|
||||
302:
|
||||
description: Authentication disabled
|
||||
"""
|
||||
if not config["auth_enabled"]:
|
||||
if not config["api_auth_enabled"]:
|
||||
return flask.redirect(Api.url_for(api, API_Root))
|
||||
|
||||
flask.session.pop("token", None)
|
||||
@ -2459,7 +2273,7 @@ class API_VM_Locks(Resource):
|
||||
else:
|
||||
return vm_node_detail, retcode
|
||||
|
||||
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
|
||||
task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
|
||||
@ -2594,7 +2408,7 @@ class API_VM_Device(Resource):
|
||||
else:
|
||||
return vm_node_detail, retcode
|
||||
|
||||
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
|
||||
task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
|
||||
@ -2648,7 +2462,7 @@ class API_VM_Device(Resource):
|
||||
else:
|
||||
return vm_node_detail, retcode
|
||||
|
||||
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
|
||||
task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
|
||||
@ -4364,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource):
|
||||
type: string (integer)
|
||||
description: The number of minor page faults during the test
|
||||
"""
|
||||
return api_benchmark.list_benchmarks(reqargs.get("job", None))
|
||||
return list_benchmarks(config, reqargs.get("job", None))
|
||||
|
||||
@RequestParser(
|
||||
[
|
||||
@ -4405,7 +4219,7 @@ class API_Storage_Ceph_Benchmark(Resource):
|
||||
}, 400
|
||||
|
||||
task = run_celery_task(
|
||||
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
|
||||
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
|
||||
)
|
||||
return (
|
||||
{
|
||||
@ -4531,7 +4345,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
|
||||
node = reqargs.get("node", None)
|
||||
|
||||
task = run_celery_task(
|
||||
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
|
||||
"osd.add_db_vg", device=reqargs.get("device", None), run_on=node
|
||||
)
|
||||
|
||||
return (
|
||||
@ -4730,7 +4544,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
node = reqargs.get("node", None)
|
||||
|
||||
task = run_celery_task(
|
||||
osd_add,
|
||||
"osd.add",
|
||||
device=reqargs.get("device", None),
|
||||
weight=reqargs.get("weight", None),
|
||||
ext_db_ratio=reqargs.get("ext_db_ratio", None),
|
||||
@ -4849,7 +4663,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
return osd_node_detail, retcode
|
||||
|
||||
task = run_celery_task(
|
||||
osd_replace,
|
||||
"osd.replace",
|
||||
osd_id=osdid,
|
||||
new_device=reqargs.get("new_device"),
|
||||
old_device=reqargs.get("old_device", None),
|
||||
@ -4907,7 +4721,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
return osd_node_detail, retcode
|
||||
|
||||
task = run_celery_task(
|
||||
osd_refresh,
|
||||
"osd.refresh",
|
||||
osd_id=osdid,
|
||||
device=reqargs.get("device", None),
|
||||
ext_db_flag=False,
|
||||
@ -4978,7 +4792,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
return osd_node_detail, retcode
|
||||
|
||||
task = run_celery_task(
|
||||
osd_remove,
|
||||
"osd.remove",
|
||||
osd_id=osdid,
|
||||
force_flag=reqargs.get("force", False),
|
||||
run_on=node,
|
||||
@ -8507,7 +8321,7 @@ class API_Provisioner_Create_Root(Resource):
|
||||
start_vm = False
|
||||
|
||||
task = run_celery_task(
|
||||
create_vm,
|
||||
"provisioner.create",
|
||||
vm_name=reqargs.get("name", None),
|
||||
profile_name=reqargs.get("profile", None),
|
||||
define_vm=define_vm,
|
||||
|
@ -48,11 +48,11 @@ import pvcapid.provisioner as provisioner
|
||||
# Database connections
|
||||
def open_database(config):
|
||||
conn = psycopg2.connect(
|
||||
host=config["database_host"],
|
||||
port=config["database_port"],
|
||||
dbname=config["database_name"],
|
||||
user=config["database_user"],
|
||||
password=config["database_password"],
|
||||
host=config["api_postgresql_host"],
|
||||
port=config["api_postgresql_port"],
|
||||
dbname=config["api_postgresql_name"],
|
||||
user=config["api_postgresql_user"],
|
||||
password=config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
return conn, cur
|
||||
|
@ -63,11 +63,11 @@ class ProvisioningError(Exception):
|
||||
# Database connections
|
||||
def open_database(config):
|
||||
conn = psycopg2.connect(
|
||||
host=config["database_host"],
|
||||
port=config["database_port"],
|
||||
dbname=config["database_name"],
|
||||
user=config["database_user"],
|
||||
password=config["database_password"],
|
||||
host=config["api_postgresql_host"],
|
||||
port=config["api_postgresql_port"],
|
||||
dbname=config["api_postgresql_dbname"],
|
||||
user=config["api_postgresql_user"],
|
||||
password=config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
return conn, cur
|
||||
|
@ -1,16 +0,0 @@
|
||||
# Parallel Virtual Cluster Celery Worker daemon unit file
|
||||
|
||||
[Unit]
|
||||
Description = Parallel Virtual Cluster Celery Worker daemon
|
||||
After = network-online.target
|
||||
|
||||
[Service]
|
||||
Type = simple
|
||||
WorkingDirectory = /usr/share/pvc
|
||||
Environment = PYTHONUNBUFFERED=true
|
||||
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
|
||||
ExecStart = /usr/share/pvc/pvcworkerd.sh
|
||||
Restart = on-failure
|
||||
|
||||
[Install]
|
||||
WantedBy = multi-user.target
|
@ -42,7 +42,7 @@ echo " done. Package version ${version}."
|
||||
|
||||
# Install the client(s) locally
|
||||
echo -n "Installing client packages locally..."
|
||||
$SUDO dpkg -i ../pvc-client*_${version}*.deb &>/dev/null
|
||||
$SUDO dpkg -i --force-all ../pvc-client*_${version}*.deb &>/dev/null
|
||||
echo " done".
|
||||
|
||||
for HOST in ${HOSTS[@]}; do
|
||||
@ -59,12 +59,16 @@ fi
|
||||
for HOST in ${HOSTS[@]}; do
|
||||
echo "> Deploying packages to host ${HOST}"
|
||||
echo -n "Installing packages..."
|
||||
ssh $HOST $SUDO dpkg -i /tmp/pvc/*.deb &>/dev/null
|
||||
ssh $HOST $SUDO dpkg -i --force-all /tmp/pvc/*.deb &>/dev/null
|
||||
ssh $HOST rm -rf /tmp/pvc &>/dev/null
|
||||
echo " done."
|
||||
echo -n "Restarting PVC daemons..."
|
||||
ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null
|
||||
sleep 2
|
||||
ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null
|
||||
sleep 2
|
||||
ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null
|
||||
sleep 2
|
||||
ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null
|
||||
echo " done."
|
||||
echo -n "Waiting for node daemon to be running..."
|
||||
|
@ -13,9 +13,11 @@ echo ${new_ver} >&3
|
||||
tmpdir=$( mktemp -d )
|
||||
cp -a debian/changelog client-cli/setup.py ${tmpdir}/
|
||||
cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py
|
||||
cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py
|
||||
cp -a worker-daemon/pvcworkerd/Daemon.py ${tmpdir}/worker-Daemon.py
|
||||
cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py
|
||||
# Replace the "base" version with the git revision version
|
||||
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
|
||||
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
|
||||
sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog
|
||||
cat <<EOF > debian/changelog
|
||||
pvc (${new_ver}) unstable; urgency=medium
|
||||
@ -33,6 +35,8 @@ dpkg-buildpackage -us -uc
|
||||
cp -a ${tmpdir}/changelog debian/changelog
|
||||
cp -a ${tmpdir}/setup.py client-cli/setup.py
|
||||
cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py
|
||||
cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py
|
||||
cp -a ${tmpdir}/worker-Daemon.py worker-daemon/pvcworkerd/Daemon.py
|
||||
cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py
|
||||
|
||||
# Clean up
|
||||
|
@ -17,9 +17,10 @@ echo "# Write the changelog below; comments will be ignored" >> ${changelog_file
|
||||
$EDITOR ${changelog_file}
|
||||
|
||||
changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )"
|
||||
rm ${changelog_file}
|
||||
|
||||
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py
|
||||
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," health-daemon/pvchealthd/Daemon.py
|
||||
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," worker-daemon/pvcworkerd/Daemon.py
|
||||
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py
|
||||
sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py
|
||||
echo ${new_version} > .version
|
||||
@ -46,11 +47,13 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file}
|
||||
echo -e "${deb_changelog_orig}" >> ${deb_changelog_file}
|
||||
mv ${deb_changelog_file} debian/changelog
|
||||
|
||||
git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
|
||||
git add node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
|
||||
git commit -v
|
||||
|
||||
popd &>/dev/null
|
||||
|
||||
rm ${changelog_file}
|
||||
|
||||
echo
|
||||
echo "Release message:"
|
||||
echo
|
||||
|
@ -5960,8 +5960,8 @@ def cli(
|
||||
"PVC_COLOUR": Force colour on the output even if Click determines it is not a console (e.g. with 'watch')
|
||||
|
||||
If a "-c"/"--connection"/"PVC_CONNECTION" is not specified, the CLI will attempt to read a "local" connection
|
||||
from the API configuration at "/etc/pvc/pvcapid.yaml". If no such configuration is found, the command will
|
||||
abort with an error. This applies to all commands except those under "connection".
|
||||
from the API configuration at "/etc/pvc/pvc.conf". If no such configuration is found, the command will abort
|
||||
with an error. This applies to all commands except those under "connection".
|
||||
"""
|
||||
|
||||
global CLI_CONFIG
|
||||
|
@ -33,14 +33,14 @@ from subprocess import run, PIPE
|
||||
from sys import argv
|
||||
from syslog import syslog, openlog, closelog, LOG_AUTH
|
||||
from yaml import load as yload
|
||||
from yaml import BaseLoader, SafeLoader
|
||||
from yaml import SafeLoader
|
||||
|
||||
import pvc.lib.provisioner
|
||||
import pvc.lib.vm
|
||||
import pvc.lib.node
|
||||
|
||||
|
||||
DEFAULT_STORE_DATA = {"cfgfile": "/etc/pvc/pvcapid.yaml"}
|
||||
DEFAULT_STORE_DATA = {"cfgfile": "/etc/pvc/pvc.conf"}
|
||||
DEFAULT_STORE_FILENAME = "pvc.json"
|
||||
DEFAULT_API_PREFIX = "/api/v1"
|
||||
DEFAULT_NODE_HOSTNAME = gethostname().split(".")[0]
|
||||
@ -88,14 +88,15 @@ def read_config_from_yaml(cfgfile):
|
||||
|
||||
try:
|
||||
with open(cfgfile) as fh:
|
||||
api_config = yload(fh, Loader=BaseLoader)["pvc"]["api"]
|
||||
api_config = yload(fh, Loader=SafeLoader)["api"]
|
||||
|
||||
host = api_config["listen_address"]
|
||||
port = api_config["listen_port"]
|
||||
scheme = "https" if strtobool(api_config["ssl"]["enabled"]) else "http"
|
||||
host = api_config["listen"]["address"]
|
||||
port = api_config["listen"]["port"]
|
||||
scheme = "https" if api_config["ssl"]["enabled"] else "http"
|
||||
api_key = (
|
||||
api_config["authentication"]["tokens"][0]["token"]
|
||||
if strtobool(api_config["authentication"]["enabled"])
|
||||
api_config["token"][0]["token"]
|
||||
if api_config["authentication"]["enabled"]
|
||||
and api_config["authentication"]["source"] == "token"
|
||||
else None
|
||||
)
|
||||
except KeyError:
|
||||
|
@ -2,7 +2,7 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name="pvc",
|
||||
version="0.9.82",
|
||||
version="0.9.83",
|
||||
packages=["pvc.cli", "pvc.lib"],
|
||||
install_requires=[
|
||||
"Click",
|
||||
|
46
api-daemon/pvcapid/benchmark.py → daemon-common/benchmark.py
Executable file → Normal file
46
api-daemon/pvcapid/benchmark.py → daemon-common/benchmark.py
Executable file → Normal file
@ -25,9 +25,6 @@ import psycopg2.extras
|
||||
from datetime import datetime
|
||||
from json import loads, dumps
|
||||
|
||||
from pvcapid.Daemon import config
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
from daemon_lib.celery import start, fail, log_info, update, finish
|
||||
|
||||
import daemon_lib.common as pvc_common
|
||||
@ -135,11 +132,11 @@ def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
|
||||
# Database connections
|
||||
def open_database(config):
|
||||
conn = psycopg2.connect(
|
||||
host=config["database_host"],
|
||||
port=config["database_port"],
|
||||
dbname=config["database_name"],
|
||||
user=config["database_user"],
|
||||
password=config["database_password"],
|
||||
host=config["api_postgresql_host"],
|
||||
port=config["api_postgresql_port"],
|
||||
dbname=config["api_postgresql_dbname"],
|
||||
user=config["api_postgresql_user"],
|
||||
password=config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
return conn, cur
|
||||
@ -152,7 +149,7 @@ def close_database(conn, cur, failed=False):
|
||||
conn.close()
|
||||
|
||||
|
||||
def list_benchmarks(job=None):
|
||||
def list_benchmarks(config, job=None):
|
||||
if job is not None:
|
||||
query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks")
|
||||
args = (job,)
|
||||
@ -278,17 +275,8 @@ def run_benchmark_job(
|
||||
return jstdout
|
||||
|
||||
|
||||
def run_benchmark(self, pool):
|
||||
def worker_run_benchmark(zkhandler, celery, config, pool):
|
||||
# Phase 0 - connect to databases
|
||||
try:
|
||||
zkhandler = ZKHandler(config)
|
||||
zkhandler.connect()
|
||||
except Exception:
|
||||
fail(
|
||||
self,
|
||||
"Failed to connect to Zookeeper",
|
||||
)
|
||||
|
||||
cur_time = datetime.now().isoformat(timespec="seconds")
|
||||
cur_primary = zkhandler.read("base.config.primary_node")
|
||||
job_name = f"{cur_time}_{cur_primary}"
|
||||
@ -296,7 +284,7 @@ def run_benchmark(self, pool):
|
||||
current_stage = 0
|
||||
total_stages = 13
|
||||
start(
|
||||
self,
|
||||
celery,
|
||||
f"Running storage benchmark '{job_name}' on pool '{pool}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
@ -312,13 +300,13 @@ def run_benchmark(self, pool):
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(
|
||||
self,
|
||||
celery,
|
||||
"Failed to connect to Postgres",
|
||||
)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
celery,
|
||||
"Storing running status in database",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
@ -340,11 +328,11 @@ def run_benchmark(self, pool):
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError)
|
||||
fail(celery, f"Failed to store running status: {e}", exception=BenchmarkError)
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
celery,
|
||||
"Creating benchmark volume",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
@ -363,7 +351,7 @@ def run_benchmark(self, pool):
|
||||
for test in test_matrix:
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
celery,
|
||||
f"Running benchmark job '{test}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
@ -381,7 +369,7 @@ def run_benchmark(self, pool):
|
||||
# Phase 3 - cleanup
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
celery,
|
||||
"Cleaning up venchmark volume",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
@ -397,7 +385,7 @@ def run_benchmark(self, pool):
|
||||
|
||||
current_stage += 1
|
||||
update(
|
||||
self,
|
||||
celery,
|
||||
"Storing results in database",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
@ -415,7 +403,7 @@ def run_benchmark(self, pool):
|
||||
db_cur=db_cur,
|
||||
zkhandler=zkhandler,
|
||||
)
|
||||
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError)
|
||||
fail(celery, f"Failed to store test results: {e}", exception=BenchmarkError)
|
||||
|
||||
cleanup(
|
||||
job_name,
|
||||
@ -426,7 +414,7 @@ def run_benchmark(self, pool):
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
self,
|
||||
celery,
|
||||
f"Storage benchmark {job_name} completed successfully",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
@ -19,8 +19,6 @@
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import daemon_lib.common as common
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import yaml
|
||||
@ -29,7 +27,6 @@ from socket import gethostname
|
||||
from re import findall
|
||||
from psutil import cpu_count
|
||||
from ipaddress import ip_address, ip_network
|
||||
from json import loads
|
||||
|
||||
|
||||
class MalformedConfigurationError(Exception):
|
||||
@ -70,29 +67,16 @@ def get_static_data():
|
||||
|
||||
|
||||
def get_configuration_path():
|
||||
config_file = None
|
||||
try:
|
||||
_config_file = "/etc/pvc/pvcnoded.yaml"
|
||||
if not os.path.exists(_config_file):
|
||||
raise
|
||||
config_file = _config_file
|
||||
config_type = "legacy"
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
_config_file = os.environ["PVC_CONFIG_FILE"]
|
||||
if not os.path.exists(_config_file):
|
||||
raise
|
||||
config_file = _config_file
|
||||
config_type = "current"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not config_file:
|
||||
print('ERROR: The "PVC_CONFIG_FILE" environment variable must be set.')
|
||||
os._exit(1)
|
||||
|
||||
return config_file, config_type
|
||||
return config_file
|
||||
|
||||
|
||||
def get_hostname():
|
||||
@ -137,7 +121,7 @@ def validate_floating_ip(config, network):
|
||||
return True, ""
|
||||
|
||||
|
||||
def get_configuration_current(config_file):
|
||||
def get_parsed_configuration(config_file):
|
||||
print('Loading configuration from file "{}"'.format(config_file))
|
||||
|
||||
with open(config_file, "r") as cfgfh:
|
||||
@ -271,17 +255,17 @@ def get_configuration_current(config_file):
|
||||
"keydb_port": o_database["keydb"]["port"],
|
||||
"keydb_host": o_database["keydb"]["hostname"],
|
||||
"keydb_path": o_database["keydb"]["path"],
|
||||
"metadata_postgresql_port": o_database["postgres"]["port"],
|
||||
"metadata_postgresql_host": o_database["postgres"]["hostname"],
|
||||
"metadata_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
|
||||
"api_postgresql_port": o_database["postgres"]["port"],
|
||||
"api_postgresql_host": o_database["postgres"]["hostname"],
|
||||
"api_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
|
||||
"database"
|
||||
],
|
||||
"metadata_postgresql_user": o_database["postgres"]["credentials"]["api"][
|
||||
"api_postgresql_user": o_database["postgres"]["credentials"]["api"][
|
||||
"username"
|
||||
],
|
||||
"metadata_postgresql_password": o_database["postgres"]["credentials"][
|
||||
"api"
|
||||
]["password"],
|
||||
"api_postgresql_password": o_database["postgres"]["credentials"]["api"][
|
||||
"password"
|
||||
],
|
||||
"pdns_postgresql_port": o_database["postgres"]["port"],
|
||||
"pdns_postgresql_host": o_database["postgres"]["hostname"],
|
||||
"pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][
|
||||
@ -335,9 +319,7 @@ def get_configuration_current(config_file):
|
||||
"log_keepalive_cluster_details": o_logging.get(
|
||||
"log_cluster_details", False
|
||||
),
|
||||
"log_keepalive_plugin_details": o_logging.get(
|
||||
"log_monitoring_details", False
|
||||
),
|
||||
"log_monitoring_details": o_logging.get("log_monitoring_details", False),
|
||||
"console_log_lines": o_logging.get("console_log_lines", False),
|
||||
"node_log_lines": o_logging.get("node_log_lines", False),
|
||||
}
|
||||
@ -362,323 +344,64 @@ def get_configuration_current(config_file):
|
||||
+ o_ceph["ceph_keyring_file"],
|
||||
"ceph_monitor_port": o_ceph["monitor_port"],
|
||||
"ceph_secret_uuid": o_ceph["secret_uuid"],
|
||||
"storage_hosts": o_ceph.get("monitor_hosts", None),
|
||||
}
|
||||
config = {**config, **config_ceph}
|
||||
|
||||
# Add our node static data to the config
|
||||
config["static_data"] = get_static_data()
|
||||
o_api = o_config["api"]
|
||||
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_configuration_legacy(pvcnoded_config_file):
|
||||
print('Loading configuration from file "{}"'.format(pvcnoded_config_file))
|
||||
|
||||
with open(pvcnoded_config_file, "r") as cfgfile:
|
||||
try:
|
||||
o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader)
|
||||
except Exception as e:
|
||||
print("ERROR: Failed to parse configuration file: {}".format(e))
|
||||
os._exit(1)
|
||||
|
||||
node_fqdn, node_hostname, node_domain, node_id = get_hostname()
|
||||
|
||||
# Create the configuration dictionary
|
||||
config = dict()
|
||||
|
||||
# Get the initial base configuration
|
||||
try:
|
||||
o_base = o_config["pvc"]
|
||||
o_cluster = o_config["pvc"]["cluster"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_general = {
|
||||
"node": o_base.get("node", node_hostname),
|
||||
"node_hostname": node_hostname,
|
||||
"node_fqdn": node_fqdn,
|
||||
"node_domain": node_domain,
|
||||
"node_id": node_id,
|
||||
"coordinators": o_cluster.get("coordinators", list()),
|
||||
"debug": o_base.get("debug", False),
|
||||
}
|
||||
|
||||
config = {**config, **config_general}
|
||||
|
||||
# Get the functions configuration
|
||||
try:
|
||||
o_functions = o_config["pvc"]["functions"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_functions = {
|
||||
"enable_hypervisor": o_functions.get("enable_hypervisor", False),
|
||||
"enable_networking": o_functions.get("enable_networking", False),
|
||||
"enable_storage": o_functions.get("enable_storage", False),
|
||||
"enable_worker": o_functions.get("enable_worker", True),
|
||||
"enable_api": o_functions.get("enable_api", False),
|
||||
}
|
||||
|
||||
config = {**config, **config_functions}
|
||||
|
||||
# Get the directory configuration
|
||||
try:
|
||||
o_directories = o_config["pvc"]["system"]["configuration"]["directories"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_directories = {
|
||||
"plugin_directory": o_directories.get(
|
||||
"plugin_directory", "/usr/share/pvc/plugins"
|
||||
),
|
||||
"dynamic_directory": o_directories.get("dynamic_directory", None),
|
||||
"log_directory": o_directories.get("log_directory", None),
|
||||
"console_log_directory": o_directories.get("console_log_directory", None),
|
||||
}
|
||||
|
||||
# Define our dynamic directory schema
|
||||
config_directories["dnsmasq_dynamic_directory"] = (
|
||||
config_directories["dynamic_directory"] + "/dnsmasq"
|
||||
)
|
||||
config_directories["pdns_dynamic_directory"] = (
|
||||
config_directories["dynamic_directory"] + "/pdns"
|
||||
)
|
||||
config_directories["nft_dynamic_directory"] = (
|
||||
config_directories["dynamic_directory"] + "/nft"
|
||||
)
|
||||
|
||||
# Define our log directory schema
|
||||
config_directories["dnsmasq_log_directory"] = (
|
||||
config_directories["log_directory"] + "/dnsmasq"
|
||||
)
|
||||
config_directories["pdns_log_directory"] = (
|
||||
config_directories["log_directory"] + "/pdns"
|
||||
)
|
||||
config_directories["nft_log_directory"] = (
|
||||
config_directories["log_directory"] + "/nft"
|
||||
)
|
||||
|
||||
config = {**config, **config_directories}
|
||||
|
||||
# Get the logging configuration
|
||||
try:
|
||||
o_logging = o_config["pvc"]["system"]["configuration"]["logging"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_logging = {
|
||||
"file_logging": o_logging.get("file_logging", False),
|
||||
"stdout_logging": o_logging.get("stdout_logging", False),
|
||||
"zookeeper_logging": o_logging.get("zookeeper_logging", False),
|
||||
"log_colours": o_logging.get("log_colours", False),
|
||||
"log_dates": o_logging.get("log_dates", False),
|
||||
"log_keepalives": o_logging.get("log_keepalives", False),
|
||||
"log_keepalive_cluster_details": o_logging.get(
|
||||
"log_keepalive_cluster_details", False
|
||||
),
|
||||
"log_keepalive_plugin_details": o_logging.get(
|
||||
"log_keepalive_plugin_details", False
|
||||
),
|
||||
"console_log_lines": o_logging.get("console_log_lines", False),
|
||||
"node_log_lines": o_logging.get("node_log_lines", False),
|
||||
}
|
||||
|
||||
config = {**config, **config_logging}
|
||||
|
||||
# Get the interval configuration
|
||||
try:
|
||||
o_intervals = o_config["pvc"]["system"]["intervals"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_intervals = {
|
||||
"vm_shutdown_timeout": int(o_intervals.get("vm_shutdown_timeout", 60)),
|
||||
"keepalive_interval": int(o_intervals.get("keepalive_interval", 5)),
|
||||
"monitoring_interval": int(o_intervals.get("monitoring_interval", 60)),
|
||||
"fence_intervals": int(o_intervals.get("fence_intervals", 6)),
|
||||
"suicide_intervals": int(o_intervals.get("suicide_interval", 0)),
|
||||
}
|
||||
|
||||
config = {**config, **config_intervals}
|
||||
|
||||
# Get the fencing configuration
|
||||
try:
|
||||
o_fencing = o_config["pvc"]["system"]["fencing"]
|
||||
o_fencing_actions = o_fencing["actions"]
|
||||
o_fencing_ipmi = o_fencing["ipmi"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_fencing = {
|
||||
"successful_fence": o_fencing_actions.get("successful_fence", None),
|
||||
"failed_fence": o_fencing_actions.get("failed_fence", None),
|
||||
"ipmi_hostname": o_fencing_ipmi.get(
|
||||
"host", f"{node_hostname}-lom.{node_domain}"
|
||||
),
|
||||
"ipmi_username": o_fencing_ipmi.get("user", "null"),
|
||||
"ipmi_password": o_fencing_ipmi.get("pass", "null"),
|
||||
}
|
||||
|
||||
config = {**config, **config_fencing}
|
||||
|
||||
# Get the migration configuration
|
||||
try:
|
||||
o_migration = o_config["pvc"]["system"]["migration"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_migration = {
|
||||
"migration_target_selector": o_migration.get("target_selector", "mem"),
|
||||
}
|
||||
|
||||
config = {**config, **config_migration}
|
||||
|
||||
if config["enable_networking"]:
|
||||
# Get the node networks configuration
|
||||
try:
|
||||
o_networks = o_config["pvc"]["cluster"]["networks"]
|
||||
o_network_cluster = o_networks["cluster"]
|
||||
o_network_storage = o_networks["storage"]
|
||||
o_network_upstream = o_networks["upstream"]
|
||||
o_sysnetworks = o_config["pvc"]["system"]["configuration"]["networking"]
|
||||
o_sysnetwork_cluster = o_sysnetworks["cluster"]
|
||||
o_sysnetwork_storage = o_sysnetworks["storage"]
|
||||
o_sysnetwork_upstream = o_sysnetworks["upstream"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_networks = {
|
||||
"cluster_domain": o_network_cluster.get("domain", None),
|
||||
"cluster_network": o_network_cluster.get("network", None),
|
||||
"cluster_floating_ip": o_network_cluster.get("floating_ip", None),
|
||||
"cluster_dev": o_sysnetwork_cluster.get("device", None),
|
||||
"cluster_mtu": o_sysnetwork_cluster.get("mtu", None),
|
||||
"cluster_dev_ip": o_sysnetwork_cluster.get("address", None),
|
||||
"storage_domain": o_network_storage.get("domain", None),
|
||||
"storage_network": o_network_storage.get("network", None),
|
||||
"storage_floating_ip": o_network_storage.get("floating_ip", None),
|
||||
"storage_dev": o_sysnetwork_storage.get("device", None),
|
||||
"storage_mtu": o_sysnetwork_storage.get("mtu", None),
|
||||
"storage_dev_ip": o_sysnetwork_storage.get("address", None),
|
||||
"upstream_domain": o_network_upstream.get("domain", None),
|
||||
"upstream_network": o_network_upstream.get("network", None),
|
||||
"upstream_floating_ip": o_network_upstream.get("floating_ip", None),
|
||||
"upstream_gateway": o_network_upstream.get("gateway", None),
|
||||
"upstream_dev": o_sysnetwork_upstream.get("device", None),
|
||||
"upstream_mtu": o_sysnetwork_upstream.get("mtu", None),
|
||||
"upstream_dev_ip": o_sysnetwork_upstream.get("address", None),
|
||||
"bridge_dev": o_sysnetworks.get("bridge_device", None),
|
||||
"bridge_mtu": o_sysnetworks.get("bridge_mtu", None),
|
||||
"enable_sriov": o_sysnetworks.get("sriov_enable", False),
|
||||
"sriov_device": o_sysnetworks.get("sriov_device", list()),
|
||||
o_api_listen = o_api["listen"]
|
||||
config_api_listen = {
|
||||
"api_listen_address": o_api_listen["address"],
|
||||
"api_listen_port": o_api_listen["port"],
|
||||
}
|
||||
config = {**config, **config_api_listen}
|
||||
|
||||
if config_networks["bridge_mtu"] is None:
|
||||
# Read the current MTU of bridge_dev and set bridge_mtu to it; avoids weird resets
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
f"ip -json link show dev {config_networks['bridge_dev']}"
|
||||
)
|
||||
current_bridge_mtu = loads(stdout)[0]["mtu"]
|
||||
print(
|
||||
f"Config key bridge_mtu not explicitly set; using live MTU {current_bridge_mtu} from {config_networks['bridge_dev']}"
|
||||
)
|
||||
config_networks["bridge_mtu"] = current_bridge_mtu
|
||||
o_api_authentication = o_api["authentication"]
|
||||
config_api_authentication = {
|
||||
"api_auth_enabled": o_api_authentication.get("enabled", False),
|
||||
"api_auth_secret_key": o_api_authentication.get("secret_key", ""),
|
||||
"api_auth_source": o_api_authentication.get("source", "token"),
|
||||
}
|
||||
config = {**config, **config_api_authentication}
|
||||
|
||||
config = {**config, **config_networks}
|
||||
o_api_ssl = o_api["ssl"]
|
||||
config_api_ssl = {
|
||||
"api_ssl_enabled": o_api_ssl.get("enabled", False),
|
||||
"api_ssl_cert_file": o_api_ssl.get("certificate", None),
|
||||
"api_ssl_key_file": o_api_ssl.get("private_key", None),
|
||||
}
|
||||
config = {**config, **config_api_ssl}
|
||||
|
||||
for network_type in ["cluster", "storage", "upstream"]:
|
||||
result, msg = validate_floating_ip(config, network_type)
|
||||
if not result:
|
||||
raise MalformedConfigurationError(msg)
|
||||
# Use coordinators as storage hosts if not explicitly specified
|
||||
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
|
||||
config["storage_hosts"] = config["coordinators"]
|
||||
|
||||
address_key = "{}_dev_ip".format(network_type)
|
||||
network_key = f"{network_type}_network"
|
||||
network = ip_network(config[network_key])
|
||||
# With autoselection of addresses, construct an IP from the relevant network
|
||||
if config[address_key] == "by-id":
|
||||
# The NodeID starts at 1, but indexes start at 0
|
||||
address_id = int(config["node_id"]) - 1
|
||||
# Grab the nth address from the network
|
||||
config[address_key] = "{}/{}".format(
|
||||
list(network.hosts())[address_id], network.prefixlen
|
||||
# Set up our token list if specified
|
||||
if config["api_auth_source"] == "token":
|
||||
config["api_auth_tokens"] = o_api["token"]
|
||||
else:
|
||||
if config["api_auth_enabled"]:
|
||||
print(
|
||||
"WARNING: No authentication method provided; disabling API authentication."
|
||||
)
|
||||
# Validate the provided IP instead
|
||||
else:
|
||||
try:
|
||||
address = ip_address(config[address_key].split("/")[0])
|
||||
if address not in list(network.hosts()):
|
||||
raise
|
||||
except Exception:
|
||||
raise MalformedConfigurationError(
|
||||
f"IP address {config[address_key]} for {address_key} is not valid"
|
||||
)
|
||||
|
||||
# Get the PowerDNS aggregator database configuration
|
||||
try:
|
||||
o_pdnsdb = o_config["pvc"]["coordinator"]["dns"]["database"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_pdnsdb = {
|
||||
"pdns_postgresql_host": o_pdnsdb.get("host", None),
|
||||
"pdns_postgresql_port": o_pdnsdb.get("port", None),
|
||||
"pdns_postgresql_dbname": o_pdnsdb.get("name", None),
|
||||
"pdns_postgresql_user": o_pdnsdb.get("user", None),
|
||||
"pdns_postgresql_password": o_pdnsdb.get("pass", None),
|
||||
}
|
||||
|
||||
config = {**config, **config_pdnsdb}
|
||||
|
||||
# Get the Cloud-Init Metadata database configuration
|
||||
try:
|
||||
o_metadatadb = o_config["pvc"]["coordinator"]["metadata"]["database"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_metadatadb = {
|
||||
"metadata_postgresql_host": o_metadatadb.get("host", None),
|
||||
"metadata_postgresql_port": o_metadatadb.get("port", None),
|
||||
"metadata_postgresql_dbname": o_metadatadb.get("name", None),
|
||||
"metadata_postgresql_user": o_metadatadb.get("user", None),
|
||||
"metadata_postgresql_password": o_metadatadb.get("pass", None),
|
||||
}
|
||||
|
||||
config = {**config, **config_metadatadb}
|
||||
|
||||
if config["enable_storage"]:
|
||||
# Get the storage configuration
|
||||
try:
|
||||
o_storage = o_config["pvc"]["system"]["configuration"]["storage"]
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
config_storage = {
|
||||
"ceph_config_file": o_storage.get("ceph_config_file", None),
|
||||
"ceph_admin_keyring": o_storage.get("ceph_admin_keyring", None),
|
||||
}
|
||||
|
||||
config = {**config, **config_storage}
|
||||
config["api_auth_enabled"] = False
|
||||
|
||||
# Add our node static data to the config
|
||||
config["static_data"] = get_static_data()
|
||||
|
||||
except Exception as e:
|
||||
raise MalformedConfigurationError(e)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_configuration():
|
||||
"""
|
||||
Parse the configuration of the node daemon.
|
||||
Get the configuration.
|
||||
"""
|
||||
pvc_config_file, pvc_config_type = get_configuration_path()
|
||||
|
||||
if pvc_config_type == "legacy":
|
||||
config = get_configuration_legacy(pvc_config_file)
|
||||
else:
|
||||
config = get_configuration_current(pvc_config_file)
|
||||
|
||||
pvc_config_file = get_configuration_path()
|
||||
config = get_parsed_configuration(pvc_config_file)
|
||||
return config
|
||||
|
||||
|
@ -80,7 +80,7 @@ class Logger(object):
|
||||
self.config["log_directory"] + "/" + self.config["daemon_name"] + ".log"
|
||||
)
|
||||
# We open the logfile for the duration of our session, but have a hup function
|
||||
self.writer = open(self.logfile, "a", buffering=0)
|
||||
self.writer = open(self.logfile, "a")
|
||||
|
||||
self.last_colour = ""
|
||||
self.last_prompt = ""
|
||||
@ -93,12 +93,10 @@ class Logger(object):
|
||||
# Provide a hup function to close and reopen the writer
|
||||
def hup(self):
|
||||
self.writer.close()
|
||||
self.writer = open(self.logfile, "a", buffering=0)
|
||||
self.writer = open(self.logfile, "a")
|
||||
|
||||
# Provide a termination function so all messages are flushed before terminating the main daemon
|
||||
def terminate(self):
|
||||
if self.config["file_logging"]:
|
||||
self.writer.close()
|
||||
if self.config["zookeeper_logging"]:
|
||||
self.out("Waiting for Zookeeper message queue to drain", state="s")
|
||||
|
||||
@ -112,6 +110,9 @@ class Logger(object):
|
||||
self.zookeeper_logger.stop()
|
||||
self.zookeeper_logger.join()
|
||||
|
||||
if self.config["file_logging"]:
|
||||
self.writer.close()
|
||||
|
||||
# Output function
|
||||
def out(self, message, state=None, prefix=""):
|
||||
# Get the date
|
||||
@ -152,11 +153,14 @@ class Logger(object):
|
||||
# Assemble output string
|
||||
output = colour + prompt + endc + date + prefix + message
|
||||
self.writer.write(output + "\n")
|
||||
self.writer.flush()
|
||||
|
||||
# Log to Zookeeper
|
||||
if self.config["zookeeper_logging"]:
|
||||
# Set the daemon value (only used here as others do not overlap with different daemons)
|
||||
daemon = f"{self.config['daemon_name']}: "
|
||||
# Expand to match all daemon names (pvcnoded, pvcworkerd, pvchealthd)
|
||||
daemon = "{daemon: <12}".format(daemon=daemon)
|
||||
# Assemble output string
|
||||
output = daemon + colour + prompt + endc + date + prefix + message
|
||||
self.zookeeper_queue.put(output)
|
||||
|
28
api-daemon/pvcapid/vmbuilder.py → daemon-common/vmbuilder.py
Executable file → Normal file
28
api-daemon/pvcapid/vmbuilder.py → daemon-common/vmbuilder.py
Executable file → Normal file
@ -31,8 +31,6 @@ import uuid
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
from pvcapid.Daemon import config
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
|
||||
|
||||
@ -167,11 +165,11 @@ def chroot(destination):
|
||||
def open_db(config):
|
||||
try:
|
||||
conn = psycopg2.connect(
|
||||
host=config["database_host"],
|
||||
port=config["database_port"],
|
||||
dbname=config["database_name"],
|
||||
user=config["database_user"],
|
||||
password=config["database_password"],
|
||||
host=config["api_postgresql_host"],
|
||||
port=config["api_postgresql_port"],
|
||||
dbname=config["api_postgresql_name"],
|
||||
user=config["api_postgresql_user"],
|
||||
password=config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
except Exception:
|
||||
@ -216,8 +214,14 @@ def open_zk(config):
|
||||
#
|
||||
# Main VM provisioning function - executed by the Celery worker
|
||||
#
|
||||
def create_vm(
|
||||
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
|
||||
def worker_create_vm(
|
||||
celery,
|
||||
config,
|
||||
vm_name,
|
||||
vm_profile,
|
||||
define_vm=True,
|
||||
start_vm=True,
|
||||
script_run_args=[],
|
||||
):
|
||||
current_stage = 0
|
||||
total_stages = 11
|
||||
@ -326,9 +330,9 @@ def create_vm(
|
||||
vm_data["system_architecture"] = stdout.strip()
|
||||
|
||||
monitor_list = list()
|
||||
coordinator_names = config["storage_hosts"]
|
||||
for coordinator in coordinator_names:
|
||||
monitor_list.append("{}.{}".format(coordinator, config["storage_domain"]))
|
||||
monitor_names = config["storage_hosts"]
|
||||
for monitor in monitor_names:
|
||||
monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
|
||||
vm_data["ceph_monitor_list"] = monitor_list
|
||||
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
|
||||
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]
|
14
debian/changelog
vendored
14
debian/changelog
vendored
@ -1,3 +1,17 @@
|
||||
pvc (0.9.83-0) unstable; urgency=high
|
||||
|
||||
**Breaking Changes:** This release features a breaking change for the daemon config. A new unified "pvc.conf" file is required for all daemons (and the CLI client for Autobackup and API-on-this-host functionality), which will be written by the "pvc" role in the PVC Ansible framework. Using the "update-pvc-daemons" oneshot playbook from PVC Ansible is **required** to update to this release, as it will ensure this file is written to the proper place before deploying the new package versions, and also ensures that the old entires are cleaned up afterwards. In addition, this release fully splits the node worker and health subsystems into discrete daemons ("pvcworkerd" and "pvchealthd") and packages ("pvc-daemon-worker" and "pvc-daemon-health") respectively. The "pvc-daemon-node" package also now depends on both packages, and the "pvc-daemon-api" package can now be reliably used outside of the PVC nodes themselves (for instance, in a VM) without any strange cross-dependency issues.
|
||||
|
||||
* [All] Unifies all daemon (and on-node CLI task) configuration into a "pvc.conf" YAML configuration.
|
||||
* [All] Splits the node worker subsystem into a discrete codebase and package ("pvc-daemon-worker"), still named "pvcworkerd".
|
||||
* [All] Splits the node health subsystem into a discrete codebase and package ("pvc-daemon-health"), named "pvchealthd".
|
||||
* [All] Improves Zookeeper node logging to avoid bugs and to support multiple simultaneous daemon writes.
|
||||
* [All] Fixes several bugs in file logging and splits file logs by daemon.
|
||||
* [Node Daemon] Improves several log messages to match new standards from Health daemon.
|
||||
* [API Daemon] Reworks Celery task routing and handling to move all worker tasks to Worker daemon.
|
||||
|
||||
-- Joshua M. Boniface <joshua@boniface.me> Fri, 01 Dec 2023 17:33:53 -0500
|
||||
|
||||
pvc (0.9.82-0) unstable; urgency=high
|
||||
|
||||
* [API Daemon] Fixes a bug where the Celery result_backend was not loading properly on Celery <5.2.x (Debian 10/11).
|
||||
|
22
debian/control
vendored
22
debian/control
vendored
@ -4,19 +4,35 @@ Priority: optional
|
||||
Maintainer: Joshua Boniface <joshua@boniface.me>
|
||||
Standards-Version: 3.9.8
|
||||
Homepage: https://www.boniface.me
|
||||
X-Python3-Version: >= 3.2
|
||||
X-Python3-Version: >= 3.7
|
||||
|
||||
Package: pvc-daemon-node
|
||||
Architecture: all
|
||||
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
|
||||
Depends: systemd, pvc-daemon-common, pvc-daemon-health, pvc-daemon-worker, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
|
||||
Description: Parallel Virtual Cluster node daemon
|
||||
A KVM/Zookeeper/Ceph-based VM and private cloud manager
|
||||
.
|
||||
This package installs the PVC node daemon
|
||||
|
||||
Package: pvc-daemon-health
|
||||
Architecture: all
|
||||
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
|
||||
Description: Parallel Virtual Cluster health daemon
|
||||
A KVM/Zookeeper/Ceph-based VM and private cloud manager
|
||||
.
|
||||
This package installs the PVC health monitoring daemon
|
||||
|
||||
Package: pvc-daemon-worker
|
||||
Architecture: all
|
||||
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio
|
||||
Description: Parallel Virtual Cluster worker daemon
|
||||
A KVM/Zookeeper/Ceph-based VM and private cloud manager
|
||||
.
|
||||
This package installs the PVC Celery task worker daemon
|
||||
|
||||
Package: pvc-daemon-api
|
||||
Architecture: all
|
||||
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio
|
||||
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
|
||||
Description: Parallel Virtual Cluster API daemon
|
||||
A KVM/Zookeeper/Ceph-based VM and private cloud manager
|
||||
.
|
||||
|
2
debian/pvc-daemon-api.install
vendored
2
debian/pvc-daemon-api.install
vendored
@ -3,7 +3,5 @@ api-daemon/pvcapid-manage*.py usr/share/pvc
|
||||
api-daemon/pvc-api-db-upgrade usr/share/pvc
|
||||
api-daemon/pvcapid usr/share/pvc
|
||||
api-daemon/pvcapid.service lib/systemd/system
|
||||
api-daemon/pvcworkerd.service lib/systemd/system
|
||||
api-daemon/pvcworkerd.sh usr/share/pvc
|
||||
api-daemon/provisioner usr/share/pvc
|
||||
api-daemon/migrations usr/share/pvc
|
||||
|
5
debian/pvc-daemon-api.preinst
vendored
Normal file
5
debian/pvc-daemon-api.preinst
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Remove any cached CPython directories or files
|
||||
echo "Cleaning up existing CPython files"
|
||||
find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
|
4
debian/pvc-daemon-health.install
vendored
Normal file
4
debian/pvc-daemon-health.install
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
health-daemon/pvchealthd.py usr/share/pvc
|
||||
health-daemon/pvchealthd usr/share/pvc
|
||||
health-daemon/pvchealthd.service lib/systemd/system
|
||||
health-daemon/plugins usr/share/pvc
|
14
debian/pvc-daemon-health.postinst
vendored
Normal file
14
debian/pvc-daemon-health.postinst
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Reload systemd's view of the units
|
||||
systemctl daemon-reload
|
||||
|
||||
# Enable the service and target
|
||||
systemctl enable /lib/systemd/system/pvchealthd.service
|
||||
|
||||
# Inform administrator of the service restart/startup not occurring automatically
|
||||
if systemctl is-active --quiet pvchealthd.service; then
|
||||
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator."
|
||||
else
|
||||
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
|
||||
fi
|
6
debian/pvc-daemon-health.preinst
vendored
Normal file
6
debian/pvc-daemon-health.preinst
vendored
Normal file
@ -0,0 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Remove any cached CPython directories or files
|
||||
echo "Cleaning up existing CPython files"
|
||||
find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
|
||||
find /usr/share/pvc/plugins -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
|
4
debian/pvc-daemon-health.prerm
vendored
Normal file
4
debian/pvc-daemon-health.prerm
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Disable the services
|
||||
systemctl disable pvchealthd.service
|
1
debian/pvc-daemon-node.install
vendored
1
debian/pvc-daemon-node.install
vendored
@ -4,4 +4,3 @@ node-daemon/pvcnoded.service lib/systemd/system
|
||||
node-daemon/pvc.target lib/systemd/system
|
||||
node-daemon/pvcautoready.service lib/systemd/system
|
||||
node-daemon/monitoring usr/share/pvc
|
||||
node-daemon/plugins usr/share/pvc
|
||||
|
2
debian/pvc-daemon-node.preinst
vendored
2
debian/pvc-daemon-node.preinst
vendored
@ -2,4 +2,4 @@
|
||||
|
||||
# Remove any cached CPython directories or files
|
||||
echo "Cleaning up existing CPython files"
|
||||
find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
|
||||
find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
|
||||
|
4
debian/pvc-daemon-worker.install
vendored
Normal file
4
debian/pvc-daemon-worker.install
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
worker-daemon/pvcworkerd.sh usr/share/pvc
|
||||
worker-daemon/pvcworkerd.py usr/share/pvc
|
||||
worker-daemon/pvcworkerd usr/share/pvc
|
||||
worker-daemon/pvcworkerd.service lib/systemd/system
|
14
debian/pvc-daemon-worker.postinst
vendored
Normal file
14
debian/pvc-daemon-worker.postinst
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Reload systemd's view of the units
|
||||
systemctl daemon-reload
|
||||
|
||||
# Enable the service and target
|
||||
systemctl enable /lib/systemd/system/pvcworkerd.service
|
||||
|
||||
# Inform administrator of the service restart/startup not occurring automatically
|
||||
if systemctl is-active --quiet pvcworkerd.service; then
|
||||
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been restarted; this is up to the administrator."
|
||||
else
|
||||
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
|
||||
fi
|
5
debian/pvc-daemon-worker.preinst
vendored
Normal file
5
debian/pvc-daemon-worker.preinst
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Remove any cached CPython directories or files
|
||||
echo "Cleaning up existing CPython files"
|
||||
find /usr/share/pvc/pvcworkerd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
|
4
debian/pvc-daemon-worker.prerm
vendored
Normal file
4
debian/pvc-daemon-worker.prerm
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Disable the services
|
||||
systemctl disable pvcworkerd.service
|
@ -8,7 +8,7 @@ import os
|
||||
import sys
|
||||
import json
|
||||
|
||||
os.environ['PVC_CONFIG_FILE'] = "./api-daemon/pvcapid.sample.yaml"
|
||||
os.environ['PVC_CONFIG_FILE'] = "./pvc.sample.conf"
|
||||
|
||||
sys.path.append('api-daemon')
|
||||
|
||||
|
1
health-daemon/daemon_lib
Symbolic link
1
health-daemon/daemon_lib
Symbolic link
@ -0,0 +1 @@
|
||||
../daemon-common
|
@ -39,7 +39,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -40,7 +40,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -39,7 +39,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
||||
@ -75,7 +75,7 @@ class MonitoringPluginScript(MonitoringPlugin):
|
||||
# Set the health delta to 0 (no change)
|
||||
health_delta = 0
|
||||
# Craft a message that can be used by the clients
|
||||
message = "Successfully connected to Libvirtd on localhost"
|
||||
message = "Successfully connected to KeyDB/Redis on localhost"
|
||||
|
||||
# Check the Zookeeper connection
|
||||
try:
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -39,7 +39,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
||||
@ -67,8 +67,8 @@ class MonitoringPluginScript(MonitoringPlugin):
|
||||
# Run any imports first
|
||||
from psycopg2 import connect
|
||||
|
||||
conn_metadata = None
|
||||
cur_metadata = None
|
||||
conn_api = None
|
||||
cur_api = None
|
||||
conn_dns = None
|
||||
cur_dns = None
|
||||
|
||||
@ -79,25 +79,25 @@ class MonitoringPluginScript(MonitoringPlugin):
|
||||
|
||||
# Check the Metadata database (primary)
|
||||
try:
|
||||
conn_metadata = connect(
|
||||
conn_api = connect(
|
||||
host=self.this_node.name,
|
||||
port=self.config["metadata_postgresql_port"],
|
||||
dbname=self.config["metadata_postgresql_dbname"],
|
||||
user=self.config["metadata_postgresql_user"],
|
||||
password=self.config["metadata_postgresql_password"],
|
||||
port=self.config["api_postgresql_port"],
|
||||
dbname=self.config["api_postgresql_dbname"],
|
||||
user=self.config["api_postgresql_user"],
|
||||
password=self.config["api_postgresql_password"],
|
||||
)
|
||||
cur_metadata = conn_metadata.cursor()
|
||||
cur_metadata.execute("""SELECT * FROM alembic_version""")
|
||||
data = cur_metadata.fetchone()
|
||||
cur_api = conn_api.cursor()
|
||||
cur_api.execute("""SELECT * FROM alembic_version""")
|
||||
data = cur_api.fetchone()
|
||||
except Exception as e:
|
||||
health_delta = 50
|
||||
err = str(e).split('\n')[0]
|
||||
message = f"Failed to connect to PostgreSQL database {self.config['metadata_postgresql_dbname']}: {err}"
|
||||
message = f"Failed to connect to PostgreSQL database {self.config['api_postgresql_dbname']}: {err}"
|
||||
finally:
|
||||
if cur_metadata is not None:
|
||||
cur_metadata.close()
|
||||
if conn_metadata is not None:
|
||||
conn_metadata.close()
|
||||
if cur_api is not None:
|
||||
cur_api.close()
|
||||
if conn_api is not None:
|
||||
conn_api.close()
|
||||
|
||||
if health_delta == 0:
|
||||
# Check the PowerDNS database (secondary)
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
@ -38,7 +38,7 @@
|
||||
|
||||
# This import is always required here, as MonitoringPlugin is used by the
|
||||
# MonitoringPluginScript class
|
||||
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
|
||||
|
||||
|
||||
# A monitoring plugin script must always expose its nice name, which must be identical to
|
24
health-daemon/pvchealthd.py
Executable file
24
health-daemon/pvchealthd.py
Executable file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# pvchealthd.py - Health daemon startup stub
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import pvchealthd.Daemon # noqa: F401
|
||||
|
||||
pvchealthd.Daemon.entrypoint()
|
20
health-daemon/pvchealthd.service
Normal file
20
health-daemon/pvchealthd.service
Normal file
@ -0,0 +1,20 @@
|
||||
# Parallel Virtual Cluster health daemon unit file
|
||||
|
||||
[Unit]
|
||||
Description = Parallel Virtual Cluster health daemon
|
||||
After = network.target
|
||||
Wants = network-online.target
|
||||
PartOf = pvc.target
|
||||
|
||||
[Service]
|
||||
Type = simple
|
||||
WorkingDirectory = /usr/share/pvc
|
||||
Environment = PYTHONUNBUFFERED=true
|
||||
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
|
||||
ExecStartPre = /bin/sleep 2
|
||||
ExecStart = /usr/share/pvc/pvchealthd.py
|
||||
ExecStopPost = /bin/sleep 2
|
||||
Restart = on-failure
|
||||
|
||||
[Install]
|
||||
WantedBy = pvc.target
|
145
health-daemon/pvchealthd/Daemon.py
Normal file
145
health-daemon/pvchealthd/Daemon.py
Normal file
@ -0,0 +1,145 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Daemon.py - Health daemon main entrypoing
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import pvchealthd.util.zookeeper
|
||||
|
||||
import pvchealthd.objects.MonitoringInstance as MonitoringInstance
|
||||
import pvchealthd.objects.NodeInstance as NodeInstance
|
||||
|
||||
import daemon_lib.config as cfg
|
||||
import daemon_lib.log as log
|
||||
|
||||
from time import sleep
|
||||
|
||||
import os
|
||||
import signal
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.83"
|
||||
|
||||
|
||||
##########################################################
|
||||
# Entrypoint
|
||||
##########################################################
|
||||
|
||||
|
||||
def entrypoint():
|
||||
monitoring_instance = None
|
||||
|
||||
# Get our configuration
|
||||
config = cfg.get_configuration()
|
||||
config["daemon_name"] = "pvchealthd"
|
||||
config["daemon_version"] = version
|
||||
|
||||
# Set up the logger instance
|
||||
logger = log.Logger(config)
|
||||
|
||||
# Print our startup message
|
||||
logger.out("")
|
||||
logger.out("|--------------------------------------------------------------|")
|
||||
logger.out("| |")
|
||||
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
|
||||
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
|
||||
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
|
||||
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
|
||||
logger.out("| |")
|
||||
logger.out("|--------------------------------------------------------------|")
|
||||
logger.out("| Parallel Virtual Cluster health daemon v{0: <20} |".format(version))
|
||||
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
|
||||
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
|
||||
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
|
||||
logger.out("| ID: {0: <56} |".format(config["node_id"]))
|
||||
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
|
||||
logger.out("| Machine details: |")
|
||||
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
|
||||
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
|
||||
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
|
||||
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
|
||||
logger.out("|--------------------------------------------------------------|")
|
||||
logger.out("")
|
||||
logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s")
|
||||
|
||||
# Connect to Zookeeper and return our handler and current schema version
|
||||
zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config)
|
||||
|
||||
# Define a cleanup function
|
||||
def cleanup(failure=False):
|
||||
nonlocal logger, zkhandler, monitoring_instance
|
||||
|
||||
logger.out("Terminating pvchealthd and cleaning up", state="s")
|
||||
|
||||
# Shut down the monitoring system
|
||||
try:
|
||||
logger.out("Shutting down monitoring subsystem", state="s")
|
||||
monitoring_instance.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Close the Zookeeper connection
|
||||
try:
|
||||
zkhandler.disconnect(persistent=True)
|
||||
del zkhandler
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.out("Terminated health daemon", state="s")
|
||||
logger.terminate()
|
||||
|
||||
if failure:
|
||||
retcode = 1
|
||||
else:
|
||||
retcode = 0
|
||||
|
||||
os._exit(retcode)
|
||||
|
||||
# Termination function
|
||||
def term(signum="", frame=""):
|
||||
cleanup(failure=False)
|
||||
|
||||
# Hangup (logrotate) function
|
||||
def hup(signum="", frame=""):
|
||||
if config["file_logging"]:
|
||||
logger.hup()
|
||||
|
||||
# Handle signals gracefully
|
||||
signal.signal(signal.SIGTERM, term)
|
||||
signal.signal(signal.SIGINT, term)
|
||||
signal.signal(signal.SIGQUIT, term)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
|
||||
this_node = NodeInstance.NodeInstance(
|
||||
config["node_hostname"],
|
||||
zkhandler,
|
||||
config,
|
||||
logger,
|
||||
)
|
||||
|
||||
# Set up the node monitoring instance and thread
|
||||
monitoring_instance = MonitoringInstance.MonitoringInstance(
|
||||
zkhandler, config, logger, this_node
|
||||
)
|
||||
|
||||
# Tick loop; does nothing since everything is async
|
||||
while True:
|
||||
try:
|
||||
sleep(1)
|
||||
except Exception:
|
||||
break
|
0
health-daemon/pvchealthd/__init__.py
Normal file
0
health-daemon/pvchealthd/__init__.py
Normal file
@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# PluginInstance.py - Class implementing a PVC monitoring instance
|
||||
# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
@ -317,8 +317,17 @@ class MonitoringInstance(object):
|
||||
)
|
||||
|
||||
if successful_plugins < 1:
|
||||
self.logger.out(
|
||||
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
|
||||
state="e",
|
||||
)
|
||||
return
|
||||
|
||||
self.logger.out(
|
||||
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Clean up any old plugin data for which a plugin file no longer exists
|
||||
plugins_data = self.zkhandler.children(
|
||||
("node.monitoring.data", self.this_node.name)
|
||||
@ -344,6 +353,7 @@ class MonitoringInstance(object):
|
||||
def shutdown(self):
|
||||
self.stop_check_timer()
|
||||
self.run_cleanups()
|
||||
return
|
||||
|
||||
def start_check_timer(self):
|
||||
check_interval = self.config["monitoring_interval"]
|
||||
@ -395,6 +405,11 @@ class MonitoringInstance(object):
|
||||
active_coordinator_state = self.this_node.coordinator_state
|
||||
|
||||
runtime_start = datetime.now()
|
||||
self.logger.out(
|
||||
"Starting monitoring healthcheck run",
|
||||
state="t",
|
||||
)
|
||||
|
||||
total_health = 100
|
||||
plugin_results = list()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
|
||||
@ -406,10 +421,7 @@ class MonitoringInstance(object):
|
||||
plugin_results.append(future.result())
|
||||
|
||||
for result in sorted(plugin_results, key=lambda x: x.plugin_name):
|
||||
if (
|
||||
self.config["log_keepalives"]
|
||||
and self.config["log_keepalive_plugin_details"]
|
||||
):
|
||||
if self.config["log_monitoring_details"]:
|
||||
self.logger.out(
|
||||
result.message + f" [-{result.health_delta}]",
|
||||
state="t",
|
224
health-daemon/pvchealthd/objects/NodeInstance.py
Normal file
224
health-daemon/pvchealthd/objects/NodeInstance.py
Normal file
@ -0,0 +1,224 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# NodeInstance.py - Class implementing a PVC node in pvchealthd
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
|
||||
class NodeInstance(object):
|
||||
# Initialization function
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
zkhandler,
|
||||
config,
|
||||
logger,
|
||||
):
|
||||
# Passed-in variables on creation
|
||||
self.name = name
|
||||
self.zkhandler = zkhandler
|
||||
self.config = config
|
||||
self.logger = logger
|
||||
# States
|
||||
self.daemon_state = "stop"
|
||||
self.coordinator_state = "client"
|
||||
self.domain_state = "flushed"
|
||||
# Node resources
|
||||
self.health = 100
|
||||
self.active_domains_count = 0
|
||||
self.provisioned_domains_count = 0
|
||||
self.memused = 0
|
||||
self.memfree = 0
|
||||
self.memalloc = 0
|
||||
self.vcpualloc = 0
|
||||
|
||||
# Zookeeper handlers for changed states
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.state.daemon", self.name)
|
||||
)
|
||||
def watch_node_daemonstate(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = "stop"
|
||||
|
||||
if data != self.daemon_state:
|
||||
self.daemon_state = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.state.router", self.name)
|
||||
)
|
||||
def watch_node_routerstate(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = "client"
|
||||
|
||||
if data != self.coordinator_state:
|
||||
self.coordinator_state = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.state.domain", self.name)
|
||||
)
|
||||
def watch_node_domainstate(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = "unknown"
|
||||
|
||||
if data != self.domain_state:
|
||||
self.domain_state = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.monitoring.health", self.name)
|
||||
)
|
||||
def watch_node_health(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 100
|
||||
|
||||
try:
|
||||
data = int(data)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if data != self.health:
|
||||
self.health = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.memory.free", self.name)
|
||||
)
|
||||
def watch_node_memfree(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 0
|
||||
|
||||
if data != self.memfree:
|
||||
self.memfree = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.memory.used", self.name)
|
||||
)
|
||||
def watch_node_memused(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 0
|
||||
|
||||
if data != self.memused:
|
||||
self.memused = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.memory.allocated", self.name)
|
||||
)
|
||||
def watch_node_memalloc(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 0
|
||||
|
||||
if data != self.memalloc:
|
||||
self.memalloc = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.vcpu.allocated", self.name)
|
||||
)
|
||||
def watch_node_vcpualloc(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 0
|
||||
|
||||
if data != self.vcpualloc:
|
||||
self.vcpualloc = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.running_domains", self.name)
|
||||
)
|
||||
def watch_node_runningdomains(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii").split()
|
||||
except AttributeError:
|
||||
data = []
|
||||
|
||||
if len(data) != self.active_domains_count:
|
||||
self.active_domains_count = len(data)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.count.provisioned_domains", self.name)
|
||||
)
|
||||
def watch_node_domainscount(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 0
|
||||
|
||||
if data != self.provisioned_domains_count:
|
||||
self.provisioned_domains_count = data
|
0
health-daemon/pvchealthd/objects/__init__.py
Normal file
0
health-daemon/pvchealthd/objects/__init__.py
Normal file
0
health-daemon/pvchealthd/util/__init__.py
Normal file
0
health-daemon/pvchealthd/util/__init__.py
Normal file
187
health-daemon/pvchealthd/util/zookeeper.py
Normal file
187
health-daemon/pvchealthd/util/zookeeper.py
Normal file
@ -0,0 +1,187 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# <Filename> - <Description>
|
||||
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
##############################################################################
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
def connect(logger, config):
|
||||
# Create an instance of the handler
|
||||
zkhandler = ZKHandler(config, logger)
|
||||
|
||||
try:
|
||||
logger.out(
|
||||
"Connecting to Zookeeper on coordinator nodes {}".format(
|
||||
config["coordinators"]
|
||||
),
|
||||
state="i",
|
||||
)
|
||||
# Start connection
|
||||
zkhandler.connect(persistent=True)
|
||||
except Exception as e:
|
||||
logger.out(
|
||||
"ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e"
|
||||
)
|
||||
os._exit(1)
|
||||
|
||||
logger.out("Validating Zookeeper schema", state="i")
|
||||
|
||||
try:
|
||||
node_schema_version = int(
|
||||
zkhandler.read(("node.data.active_schema", config["node_hostname"]))
|
||||
)
|
||||
except Exception:
|
||||
node_schema_version = int(zkhandler.read("base.schema.version"))
|
||||
zkhandler.write(
|
||||
[
|
||||
(
|
||||
("node.data.active_schema", config["node_hostname"]),
|
||||
node_schema_version,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# Load in the current node schema version
|
||||
zkhandler.schema.load(node_schema_version)
|
||||
|
||||
# Record the latest intalled schema version
|
||||
latest_schema_version = zkhandler.schema.find_latest()
|
||||
logger.out("Latest installed schema is {}".format(latest_schema_version), state="i")
|
||||
zkhandler.write(
|
||||
[(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)]
|
||||
)
|
||||
|
||||
# If we are the last node to get a schema update, fire the master update
|
||||
if latest_schema_version > node_schema_version:
|
||||
node_latest_schema_version = list()
|
||||
for node in zkhandler.children("base.node"):
|
||||
node_latest_schema_version.append(
|
||||
int(zkhandler.read(("node.data.latest_schema", node)))
|
||||
)
|
||||
|
||||
# This is true if all elements of the latest schema version are identical to the latest version,
|
||||
# i.e. they have all had the latest schema installed and ready to load.
|
||||
if node_latest_schema_version.count(latest_schema_version) == len(
|
||||
node_latest_schema_version
|
||||
):
|
||||
zkhandler.write([("base.schema.version", latest_schema_version)])
|
||||
|
||||
return zkhandler, node_schema_version
|
||||
|
||||
|
||||
def validate_schema(logger, zkhandler):
|
||||
# Validate our schema against the active version
|
||||
if not zkhandler.schema.validate(zkhandler, logger):
|
||||
logger.out("Found schema violations, applying", state="i")
|
||||
zkhandler.schema.apply(zkhandler)
|
||||
else:
|
||||
logger.out("Schema successfully validated", state="o")
|
||||
|
||||
|
||||
def setup_node(logger, config, zkhandler):
|
||||
# Check if our node exists in Zookeeper, and create it if not
|
||||
if config["daemon_mode"] == "coordinator":
|
||||
init_routerstate = "secondary"
|
||||
else:
|
||||
init_routerstate = "client"
|
||||
|
||||
if zkhandler.exists(("node", config["node_hostname"])):
|
||||
logger.out(
|
||||
f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i"
|
||||
)
|
||||
# Update static data just in case it's changed
|
||||
zkhandler.write(
|
||||
[
|
||||
(("node", config["node_hostname"]), config["daemon_mode"]),
|
||||
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
|
||||
(("node.state.daemon", config["node_hostname"]), "init"),
|
||||
(("node.state.router", config["node_hostname"]), init_routerstate),
|
||||
(
|
||||
("node.data.static", config["node_hostname"]),
|
||||
" ".join(config["static_data"]),
|
||||
),
|
||||
(
|
||||
("node.data.pvc_version", config["node_hostname"]),
|
||||
config["daemon_version"],
|
||||
),
|
||||
(
|
||||
("node.ipmi.hostname", config["node_hostname"]),
|
||||
config["ipmi_hostname"],
|
||||
),
|
||||
(
|
||||
("node.ipmi.username", config["node_hostname"]),
|
||||
config["ipmi_username"],
|
||||
),
|
||||
(
|
||||
("node.ipmi.password", config["node_hostname"]),
|
||||
config["ipmi_password"],
|
||||
),
|
||||
]
|
||||
)
|
||||
else:
|
||||
logger.out(
|
||||
f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node",
|
||||
state="i",
|
||||
)
|
||||
keepalive_time = int(time.time())
|
||||
zkhandler.write(
|
||||
[
|
||||
(("node", config["node_hostname"]), config["daemon_mode"]),
|
||||
(("node.keepalive", config["node_hostname"]), str(keepalive_time)),
|
||||
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
|
||||
(("node.state.daemon", config["node_hostname"]), "init"),
|
||||
(("node.state.domain", config["node_hostname"]), "flushed"),
|
||||
(("node.state.router", config["node_hostname"]), init_routerstate),
|
||||
(
|
||||
("node.data.static", config["node_hostname"]),
|
||||
" ".join(config["static_data"]),
|
||||
),
|
||||
(
|
||||
("node.data.pvc_version", config["node_hostname"]),
|
||||
config["daemon_version"],
|
||||
),
|
||||
(
|
||||
("node.ipmi.hostname", config["node_hostname"]),
|
||||
config["ipmi_hostname"],
|
||||
),
|
||||
(
|
||||
("node.ipmi.username", config["node_hostname"]),
|
||||
config["ipmi_username"],
|
||||
),
|
||||
(
|
||||
("node.ipmi.password", config["node_hostname"]),
|
||||
config["ipmi_password"],
|
||||
),
|
||||
(("node.memory.total", config["node_hostname"]), "0"),
|
||||
(("node.memory.used", config["node_hostname"]), "0"),
|
||||
(("node.memory.free", config["node_hostname"]), "0"),
|
||||
(("node.memory.allocated", config["node_hostname"]), "0"),
|
||||
(("node.memory.provisioned", config["node_hostname"]), "0"),
|
||||
(("node.vcpu.allocated", config["node_hostname"]), "0"),
|
||||
(("node.cpu.load", config["node_hostname"]), "0.0"),
|
||||
(("node.running_domains", config["node_hostname"]), "0"),
|
||||
(("node.count.provisioned_domains", config["node_hostname"]), "0"),
|
||||
(("node.count.networks", config["node_hostname"]), "0"),
|
||||
]
|
||||
)
|
@ -20,14 +20,12 @@
|
||||
###############################################################################
|
||||
|
||||
import pvcnoded.util.keepalive
|
||||
import pvcnoded.util.config
|
||||
import pvcnoded.util.fencing
|
||||
import pvcnoded.util.networking
|
||||
import pvcnoded.util.services
|
||||
import pvcnoded.util.libvirt
|
||||
import pvcnoded.util.zookeeper
|
||||
|
||||
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
|
||||
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
|
||||
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
|
||||
import pvcnoded.objects.VMInstance as VMInstance
|
||||
@ -36,6 +34,7 @@ import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
|
||||
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
|
||||
import pvcnoded.objects.CephInstance as CephInstance
|
||||
|
||||
import daemon_lib.config as cfg
|
||||
import daemon_lib.log as log
|
||||
import daemon_lib.common as common
|
||||
|
||||
@ -49,7 +48,7 @@ import re
|
||||
import json
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.82"
|
||||
version = "0.9.83"
|
||||
|
||||
|
||||
##########################################################
|
||||
@ -59,41 +58,40 @@ version = "0.9.82"
|
||||
|
||||
def entrypoint():
|
||||
keepalive_timer = None
|
||||
monitoring_instance = None
|
||||
|
||||
# Get our configuration
|
||||
config = pvcnoded.util.config.get_configuration()
|
||||
config = cfg.get_configuration()
|
||||
config["daemon_name"] = "pvcnoded"
|
||||
config["daemon_version"] = version
|
||||
|
||||
# Create and validate our directories
|
||||
pvcnoded.util.config.validate_directories(config)
|
||||
cfg.validate_directories(config)
|
||||
|
||||
# Set up the logger instance
|
||||
logger = log.Logger(config)
|
||||
|
||||
# Print our startup message
|
||||
logger.out("")
|
||||
logger.out("|------------------------------------------------------------|")
|
||||
logger.out("| |")
|
||||
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
|
||||
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
|
||||
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
|
||||
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
|
||||
logger.out("| |")
|
||||
logger.out("|------------------------------------------------------------|")
|
||||
logger.out("| Parallel Virtual Cluster node daemon v{0: <20} |".format(version))
|
||||
logger.out("| Debug: {0: <51} |".format(str(config["debug"])))
|
||||
logger.out("| FQDN: {0: <52} |".format(config["node_fqdn"]))
|
||||
logger.out("| Host: {0: <52} |".format(config["node_hostname"]))
|
||||
logger.out("| ID: {0: <54} |".format(config["node_id"]))
|
||||
logger.out("| IPMI hostname: {0: <43} |".format(config["ipmi_hostname"]))
|
||||
logger.out("| Machine details: |")
|
||||
logger.out("| CPUs: {0: <50} |".format(config["static_data"][0]))
|
||||
logger.out("| Arch: {0: <50} |".format(config["static_data"][3]))
|
||||
logger.out("| OS: {0: <52} |".format(config["static_data"][2]))
|
||||
logger.out("| Kernel: {0: <48} |".format(config["static_data"][1]))
|
||||
logger.out("|------------------------------------------------------------|")
|
||||
logger.out("|--------------------------------------------------------------|")
|
||||
logger.out("| |")
|
||||
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
|
||||
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
|
||||
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
|
||||
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
|
||||
logger.out("| |")
|
||||
logger.out("|--------------------------------------------------------------|")
|
||||
logger.out("| Parallel Virtual Cluster node daemon v{0: <22} |".format(version))
|
||||
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
|
||||
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
|
||||
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
|
||||
logger.out("| ID: {0: <56} |".format(config["node_id"]))
|
||||
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
|
||||
logger.out("| Machine details: |")
|
||||
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
|
||||
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
|
||||
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
|
||||
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
|
||||
logger.out("|--------------------------------------------------------------|")
|
||||
logger.out("")
|
||||
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s")
|
||||
|
||||
@ -202,7 +200,7 @@ def entrypoint():
|
||||
|
||||
# Define a cleanup function
|
||||
def cleanup(failure=False):
|
||||
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance
|
||||
nonlocal logger, zkhandler, keepalive_timer, d_domain
|
||||
|
||||
logger.out("Terminating pvcnoded and cleaning up", state="s")
|
||||
|
||||
@ -251,13 +249,6 @@ def entrypoint():
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Shut down the monitoring system
|
||||
try:
|
||||
logger.out("Shutting down monitoring subsystem", state="s")
|
||||
monitoring_instance.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Set stop state in Zookeeper
|
||||
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
|
||||
|
||||
@ -395,8 +386,8 @@ def entrypoint():
|
||||
|
||||
node_list = new_node_list
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}Node list:{logger.fmt_end} {" ".join(node_list)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Update node objects lists
|
||||
@ -563,8 +554,8 @@ def entrypoint():
|
||||
# Update the new list
|
||||
network_list = new_network_list
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}Network list:{logger.fmt_end} {" ".join(network_list)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Update node objects list
|
||||
@ -894,8 +885,8 @@ def entrypoint():
|
||||
|
||||
sriov_vf_list = sorted(new_sriov_vf_list)
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
if config["enable_hypervisor"]:
|
||||
@ -921,8 +912,8 @@ def entrypoint():
|
||||
# Update the new list
|
||||
domain_list = new_domain_list
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Update node objects' list
|
||||
@ -948,8 +939,8 @@ def entrypoint():
|
||||
# Update the new list
|
||||
osd_list = new_osd_list
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Pool objects
|
||||
@ -973,8 +964,8 @@ def entrypoint():
|
||||
# Update the new list
|
||||
pool_list = new_pool_list
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Volume objects (in each pool)
|
||||
@ -1005,15 +996,10 @@ def entrypoint():
|
||||
# Update the new list
|
||||
volume_list[pool] = new_volume_list
|
||||
logger.out(
|
||||
f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
|
||||
state="i",
|
||||
f'{logger.fmt_cyan}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Set up the node monitoring instance and thread
|
||||
monitoring_instance = MonitoringInstance.MonitoringInstance(
|
||||
zkhandler, config, logger, this_node
|
||||
)
|
||||
|
||||
# Start keepalived thread
|
||||
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
|
||||
logger, config, zkhandler, this_node
|
||||
|
@ -131,11 +131,11 @@ class MetadataAPIInstance(object):
|
||||
# Helper functions
|
||||
def open_database(self):
|
||||
conn = psycopg2.connect(
|
||||
host=self.config["metadata_postgresql_host"],
|
||||
port=self.config["metadata_postgresql_port"],
|
||||
dbname=self.config["metadata_postgresql_dbname"],
|
||||
user=self.config["metadata_postgresql_user"],
|
||||
password=self.config["metadata_postgresql_password"],
|
||||
host=self.config["api_postgresql_host"],
|
||||
port=self.config["api_postgresql_port"],
|
||||
dbname=self.config["api_postgresql_dbname"],
|
||||
user=self.config["api_postgresql_user"],
|
||||
password=self.config["api_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=RealDictCursor)
|
||||
return conn, cur
|
||||
|
@ -700,6 +700,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
active_coordinator_state = this_node.coordinator_state
|
||||
|
||||
runtime_start = datetime.now()
|
||||
logger.out(
|
||||
"Starting node keepalive run",
|
||||
state="t",
|
||||
)
|
||||
|
||||
# Set the migration selector in Zookeeper for clients to read
|
||||
if config["enable_hypervisor"]:
|
||||
|
@ -78,13 +78,19 @@ def start_keydb(logger, config):
|
||||
common.run_os_command("systemctl start keydb-server.service")
|
||||
|
||||
|
||||
def start_worker(logger, config):
|
||||
def start_workerd(logger, config):
|
||||
if config["enable_worker"]:
|
||||
logger.out("Starting Celery Worker daemon", state="i")
|
||||
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
|
||||
common.run_os_command("systemctl start pvcworkerd.service")
|
||||
|
||||
|
||||
def start_healthd(logger, config):
|
||||
logger.out("Starting Health Monitoring daemon", state="i")
|
||||
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
|
||||
common.run_os_command("systemctl start pvchealthd.service")
|
||||
|
||||
|
||||
def start_system_services(logger, config):
|
||||
start_zookeeper(logger, config)
|
||||
start_libvirtd(logger, config)
|
||||
@ -93,7 +99,8 @@ def start_system_services(logger, config):
|
||||
start_ceph_mon(logger, config)
|
||||
start_ceph_mgr(logger, config)
|
||||
start_keydb(logger, config)
|
||||
start_worker(logger, config)
|
||||
start_workerd(logger, config)
|
||||
start_healthd(logger, config)
|
||||
|
||||
logger.out("Waiting 10 seconds for daemons to start", state="s")
|
||||
sleep(10)
|
||||
logger.out("Waiting 5 seconds for daemons to start", state="s")
|
||||
sleep(5)
|
||||
|
@ -290,7 +290,7 @@ logging:
|
||||
# Enable or disable cluster detail logging during keepalive events
|
||||
log_cluster_details: yes
|
||||
|
||||
# Enable or disable monitoring detail logging during keepalive events
|
||||
# Enable or disable monitoring detail logging during healthcheck events
|
||||
log_monitoring_details: yes
|
||||
|
||||
# Number of VM console log lines to store in Zookeeper (per VM)
|
||||
|
1
worker-daemon/daemon_lib
Symbolic link
1
worker-daemon/daemon_lib
Symbolic link
@ -0,0 +1 @@
|
||||
../daemon-common
|
24
worker-daemon/pvcworkerd.py
Executable file
24
worker-daemon/pvcworkerd.py
Executable file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# pvcworkerd.py - Health daemon startup stub
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
import pvcworkerd.Daemon # noqa: F401
|
||||
|
||||
pvcworkerd.Daemon.entrypoint()
|
20
worker-daemon/pvcworkerd.service
Normal file
20
worker-daemon/pvcworkerd.service
Normal file
@ -0,0 +1,20 @@
|
||||
# Parallel Virtual Cluster worker daemon unit file
|
||||
|
||||
[Unit]
|
||||
Description = Parallel Virtual Cluster worker daemon
|
||||
After = network.target
|
||||
Wants = network-online.target
|
||||
PartOf = pvc.target
|
||||
|
||||
[Service]
|
||||
Type = simple
|
||||
WorkingDirectory = /usr/share/pvc
|
||||
Environment = PYTHONUNBUFFERED=true
|
||||
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
|
||||
ExecStartPre = /bin/sleep 2
|
||||
ExecStart = /usr/share/pvc/pvcworkerd.sh
|
||||
ExecStopPost = /bin/sleep 2
|
||||
Restart = on-failure
|
||||
|
||||
[Install]
|
||||
WantedBy = pvc.target
|
@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
|
||||
# app arguments work in a non-backwards-compatible way with Celery 5.
|
||||
case "$( cat /etc/debian_version )" in
|
||||
10.*)
|
||||
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
CELERY_ARGS="worker --app pvcworkerd.Daemon.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
;;
|
||||
*)
|
||||
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
CELERY_ARGS="--app pvcworkerd.Daemon.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
;;
|
||||
esac
|
||||
|
237
worker-daemon/pvcworkerd/Daemon.py
Executable file
237
worker-daemon/pvcworkerd/Daemon.py
Executable file
@ -0,0 +1,237 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Daemon.py - PVC Node Worker daemon
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
from celery import Celery
|
||||
|
||||
import daemon_lib.config as cfg
|
||||
|
||||
from daemon_lib.zkhandler import ZKConnection
|
||||
from daemon_lib.vm import (
|
||||
vm_worker_flush_locks,
|
||||
vm_worker_attach_device,
|
||||
vm_worker_detach_device,
|
||||
)
|
||||
from daemon_lib.ceph import (
|
||||
osd_worker_add_osd,
|
||||
osd_worker_replace_osd,
|
||||
osd_worker_refresh_osd,
|
||||
osd_worker_remove_osd,
|
||||
osd_worker_add_db_vg,
|
||||
)
|
||||
from daemon_lib.benchmark import (
|
||||
worker_run_benchmark,
|
||||
)
|
||||
from daemon_lib.vmbuilder import (
|
||||
worker_create_vm,
|
||||
)
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.83"
|
||||
|
||||
|
||||
config = cfg.get_configuration()
|
||||
config["daemon_name"] = "pvcworkerd"
|
||||
config["daemon_version"] = version
|
||||
|
||||
|
||||
celery_task_uri = "redis://{}:{}{}".format(
|
||||
config["keydb_host"], config["keydb_port"], config["keydb_path"]
|
||||
)
|
||||
celery = Celery(
|
||||
"pvcworkerd",
|
||||
broker=celery_task_uri,
|
||||
result_backend=celery_task_uri,
|
||||
result_extended=True,
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
# Job functions
|
||||
#
|
||||
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
|
||||
def create_vm(
|
||||
self,
|
||||
vm_name=None,
|
||||
profile_name=None,
|
||||
define_vm=True,
|
||||
start_vm=True,
|
||||
script_run_args=[],
|
||||
run_on="primary",
|
||||
):
|
||||
return worker_create_vm(
|
||||
self,
|
||||
config,
|
||||
vm_name,
|
||||
profile_name,
|
||||
define_vm=define_vm,
|
||||
start_vm=start_vm,
|
||||
script_run_args=script_run_args,
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
|
||||
def storage_benchmark(self, pool=None, run_on="primary"):
|
||||
@ZKConnection(config)
|
||||
def run_storage_benchmark(zkhandler, self, pool):
|
||||
return worker_run_benchmark(zkhandler, self, config, pool)
|
||||
|
||||
return run_storage_benchmark(self, pool)
|
||||
|
||||
|
||||
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
|
||||
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
|
||||
@ZKConnection(config)
|
||||
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
|
||||
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
|
||||
|
||||
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
|
||||
|
||||
|
||||
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
|
||||
def vm_device_attach(self, domain=None, xml=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_vm_device_attach(zkhandler, self, domain, xml):
|
||||
return vm_worker_attach_device(zkhandler, self, domain, xml)
|
||||
|
||||
return run_vm_device_attach(self, domain, xml)
|
||||
|
||||
|
||||
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
|
||||
def vm_device_detach(self, domain=None, xml=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_vm_device_detach(zkhandler, self, domain, xml):
|
||||
return vm_worker_detach_device(zkhandler, self, domain, xml)
|
||||
|
||||
return run_vm_device_detach(self, domain, xml)
|
||||
|
||||
|
||||
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
||||
def osd_add(
|
||||
self,
|
||||
device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
split_count=None,
|
||||
run_on=None,
|
||||
):
|
||||
@ZKConnection(config)
|
||||
def run_osd_add(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
device,
|
||||
weight,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
split_count=None,
|
||||
):
|
||||
return osd_worker_add_osd(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
device,
|
||||
weight,
|
||||
ext_db_ratio,
|
||||
ext_db_size,
|
||||
split_count,
|
||||
)
|
||||
|
||||
return run_osd_add(
|
||||
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
|
||||
def osd_replace(
|
||||
self,
|
||||
osd_id=None,
|
||||
new_device=None,
|
||||
old_device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
run_on=None,
|
||||
):
|
||||
@ZKConnection(config)
|
||||
def run_osd_replace(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
osd_id,
|
||||
new_device,
|
||||
old_device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
):
|
||||
return osd_worker_replace_osd(
|
||||
zkhandler,
|
||||
self,
|
||||
run_on,
|
||||
osd_id,
|
||||
new_device,
|
||||
old_device,
|
||||
weight,
|
||||
ext_db_ratio,
|
||||
ext_db_size,
|
||||
)
|
||||
|
||||
return run_osd_replace(
|
||||
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
|
||||
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
|
||||
return osd_worker_refresh_osd(
|
||||
zkhandler, self, run_on, osd_id, device, ext_db_flag
|
||||
)
|
||||
|
||||
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
|
||||
|
||||
|
||||
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
|
||||
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_remove(
|
||||
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
|
||||
):
|
||||
return osd_worker_remove_osd(
|
||||
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
|
||||
)
|
||||
|
||||
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
|
||||
|
||||
|
||||
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
|
||||
def osd_add_db_vg(self, device=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_add_db_vg(zkhandler, self, run_on, device):
|
||||
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
|
||||
|
||||
return run_osd_add_db_vg(self, run_on, device)
|
||||
|
||||
|
||||
def entrypoint():
|
||||
pass
|
0
worker-daemon/pvcworkerd/__init__.py
Normal file
0
worker-daemon/pvcworkerd/__init__.py
Normal file
Reference in New Issue
Block a user