Compare commits

..

28 Commits

Author SHA1 Message Date
988de1218f Bump version to 0.9.83 2023-12-01 17:37:42 -05:00
0ffcbf3152 Fix bad file paths 2023-12-01 17:25:12 -05:00
ad8d8cf7a7 Avoid removing changelog file until the end
Avoids losing a changelog if something else fails.
2023-12-01 17:23:43 -05:00
915a84ee3c Fix psql check for new configs 2023-12-01 03:58:21 -05:00
6315a068d1 Use SafeLoader for config load 2023-12-01 02:01:24 -05:00
2afd064445 Update CLI to read from pvc.conf 2023-12-01 01:53:33 -05:00
7cb9ebae6b Remove legacy configuration handler
This is not going to be needed.
2023-12-01 01:25:40 -05:00
1fb0463dea Adjust daemon service startup
Add healthd, adjust workerd, lower waittime
2023-11-30 03:28:02 -05:00
13549fc995 Depend pvcnoded on pvcworkerd 2023-11-30 03:24:01 -05:00
102c3c3106 Port all Celery worker functions to discrete pkg
Moves all tasks run by the Celery worker into a discrete package/module
for easier installation. Also adjusts several parameters throughout to
accomplish this.
2023-11-30 02:24:54 -05:00
0c0fb65c62 Rework Flask API to route Celery tasks manually
Avoids needing to define any of these tasks here; they can all be
defined in the pvcworkerd code.
2023-11-30 00:40:09 -05:00
03a738f878 Move config parser into daemon_lib
And reformat/add config values for API.
2023-11-30 00:05:37 -05:00
4df5fdbca6 Update description of example conf 2023-11-29 21:21:51 -05:00
97eb63ebab Clean up config naming and dead files 2023-11-29 21:21:51 -05:00
4a2eba0961 Improve node output messages (from pvchealthd)
1. Output startup "list" entries in cyan with s state
2. Add start of keepalive run message
2023-11-29 21:21:51 -05:00
077dd8708f Add check start message 2023-11-29 21:21:51 -05:00
b6b5786c3b Output list in cyan (s state) 2023-11-29 21:21:51 -05:00
ad738dec40 Clean up plugin pycache too 2023-11-29 21:21:51 -05:00
d2b764a2c7 Output more details on startup 2023-11-29 21:21:51 -05:00
b8aecd9c83 Wait less time between restarts 2023-11-29 21:21:51 -05:00
11db3c5b20 Fix ordering during termination 2023-11-29 21:21:51 -05:00
7a7c975eff Ensure return from health shutdown 2023-11-29 21:21:51 -05:00
fa12a3c9b1 Permit buffered log appending 2023-11-29 21:21:51 -05:00
787f4216b3 Expand Zookeeper log daemon prefix to match 2023-11-29 21:21:51 -05:00
647cba3cf5 Expand startup width for new daemon name 2023-11-29 21:21:51 -05:00
921ecb3a05 Fix name in kydb plugin 2023-11-29 21:21:51 -05:00
6a68cf665b Wait between service restarts 2023-11-29 21:21:51 -05:00
41f4e4fb2f Split health monitoring into discrete daemon/pkg 2023-11-29 21:21:51 -05:00
65 changed files with 1296 additions and 921 deletions

View File

@ -1 +1 @@
0.9.82
0.9.83

View File

@ -1,5 +1,17 @@
## PVC Changelog
###### [v0.9.83](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.83)
**Breaking Changes:** This release features a breaking change for the daemon config. A new unified "pvc.conf" file is required for all daemons (and the CLI client for Autobackup and API-on-this-host functionality), which will be written by the "pvc" role in the PVC Ansible framework. Using the "update-pvc-daemons" oneshot playbook from PVC Ansible is **required** to update to this release, as it will ensure this file is written to the proper place before deploying the new package versions, and also ensures that the old entires are cleaned up afterwards. In addition, this release fully splits the node worker and health subsystems into discrete daemons ("pvcworkerd" and "pvchealthd") and packages ("pvc-daemon-worker" and "pvc-daemon-health") respectively. The "pvc-daemon-node" package also now depends on both packages, and the "pvc-daemon-api" package can now be reliably used outside of the PVC nodes themselves (for instance, in a VM) without any strange cross-dependency issues.
* [All] Unifies all daemon (and on-node CLI task) configuration into a "pvc.conf" YAML configuration.
* [All] Splits the node worker subsystem into a discrete codebase and package ("pvc-daemon-worker"), still named "pvcworkerd".
* [All] Splits the node health subsystem into a discrete codebase and package ("pvc-daemon-health"), named "pvchealthd".
* [All] Improves Zookeeper node logging to avoid bugs and to support multiple simultaneous daemon writes.
* [All] Fixes several bugs in file logging and splits file logs by daemon.
* [Node Daemon] Improves several log messages to match new standards from Health daemon.
* [API Daemon] Reworks Celery task routing and handling to move all worker tasks to Worker daemon.
###### [v0.9.82](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.82)
* [API Daemon] Fixes a bug where the Celery result_backend was not loading properly on Celery <5.2.x (Debian 10/11).

View File

@ -19,15 +19,15 @@
#
###############################################################################
import os
import yaml
from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg
# Daemon version
version = "0.9.82"
version = "0.9.83"
# API version
API_VERSION = 1.0
@ -53,160 +53,13 @@ def strtobool(stringv):
# Configuration Parsing
##########################################################
# Parse the configuration file
config_file = None
try:
_config_file = "/etc/pvc/pvcapid.yaml"
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "legacy"
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "current"
except Exception:
pass
if not config_file:
print(
'Error: The "PVC_CONFIG_FILE" environment variable must be set before starting pvcapid.'
)
exit(1)
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvcapid"
config["daemon_version"] = version
def load_configuration_file(config_file):
print('Loading configuration from file "{}"'.format(config_file))
# Read in the config
try:
with open(config_file, "r") as cfgfile:
o_config = yaml.load(cfgfile, Loader=yaml.BaseLoader)
except Exception as e:
print("ERROR: Failed to parse configuration file: {}".format(e))
exit(1)
return o_config
def get_configuration_current(config_file):
o_config = load_configuration_file(config_file)
try:
# Create the config object
config = {
"debug": strtobool(o_config["logging"].get("debug_logging", "False")),
"all_nodes": o_config["cluster"]["all_nodes"],
"coordinators": o_config["cluster"]["coordinator_nodes"],
"listen_address": o_config["api"]["listen"]["address"],
"listen_port": int(o_config["api"]["listen"]["port"]),
"auth_enabled": strtobool(
o_config["api"]["authentication"].get("enabled", "False")
),
"auth_secret_key": o_config["api"]["authentication"]["secret_key"],
"auth_source": o_config["api"]["authentication"]["source"],
"ssl_enabled": strtobool(o_config["api"]["ssl"].get("enabled", "False")),
"ssl_cert_file": o_config["api"]["ssl"]["certificate"],
"ssl_key_file": o_config["api"]["ssl"]["private_key"],
"database_port": o_config["database"]["postgres"]["port"],
"database_host": o_config["database"]["postgres"]["hostname"],
"database_name": o_config["database"]["postgres"]["credentials"]["api"][
"database"
],
"database_user": o_config["database"]["postgres"]["credentials"]["api"][
"username"
],
"database_password": o_config["database"]["postgres"]["credentials"]["api"][
"password"
],
"queue_port": o_config["database"]["keydb"]["port"],
"queue_host": o_config["database"]["keydb"]["hostname"],
"queue_path": o_config["database"]["keydb"]["path"],
"storage_domain": o_config["cluster"]["networks"]["storage"]["domain"],
"storage_hosts": o_config["ceph"].get("monitor_hosts", None),
"ceph_monitor_port": o_config["ceph"]["monitor_port"],
"ceph_storage_secret_uuid": o_config["ceph"]["secret_uuid"],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
# Set up our token list if specified
if config["auth_source"] == "token":
config["auth_tokens"] = o_config["api"]["token"]
else:
if config["auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling authentication."
)
config["auth_enabled"] = False
except Exception as e:
print(f"ERROR: Failed to load configuration: {e}")
exit(1)
return config
def get_configuration_legacy(config_file):
o_config = load_configuration_file(config_file)
try:
# Create the config object
config = {
"debug": strtobool(o_config["pvc"]["debug"]),
"coordinators": o_config["pvc"]["coordinators"],
"listen_address": o_config["pvc"]["api"]["listen_address"],
"listen_port": int(o_config["pvc"]["api"]["listen_port"]),
"auth_enabled": strtobool(
o_config["pvc"]["api"]["authentication"]["enabled"]
),
"auth_secret_key": o_config["pvc"]["api"]["authentication"]["secret_key"],
"auth_tokens": o_config["pvc"]["api"]["authentication"]["tokens"],
"ssl_enabled": strtobool(o_config["pvc"]["api"]["ssl"]["enabled"]),
"ssl_key_file": o_config["pvc"]["api"]["ssl"]["key_file"],
"ssl_cert_file": o_config["pvc"]["api"]["ssl"]["cert_file"],
"database_host": o_config["pvc"]["provisioner"]["database"]["host"],
"database_port": int(o_config["pvc"]["provisioner"]["database"]["port"]),
"database_name": o_config["pvc"]["provisioner"]["database"]["name"],
"database_user": o_config["pvc"]["provisioner"]["database"]["user"],
"database_password": o_config["pvc"]["provisioner"]["database"]["pass"],
"queue_host": o_config["pvc"]["provisioner"]["queue"]["host"],
"queue_port": o_config["pvc"]["provisioner"]["queue"]["port"],
"queue_path": o_config["pvc"]["provisioner"]["queue"]["path"],
"storage_hosts": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_hosts"
],
"storage_domain": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_domain"
],
"ceph_monitor_port": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_monitor_port"
],
"ceph_storage_secret_uuid": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_storage_secret_uuid"
],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"]:
config["storage_hosts"] = config["coordinators"]
except Exception as e:
print("ERROR: Failed to load configuration: {}".format(e))
exit(1)
return config
if config_type == "legacy":
config = get_configuration_legacy(config_file)
else:
config = get_configuration_current(config_file)
##########################################################
# Entrypoint
##########################################################
@ -215,41 +68,43 @@ else:
def entrypoint():
import pvcapid.flaskapi as pvc_api # noqa: E402
if config["ssl_enabled"]:
if config["api_ssl_enabled"]:
context = SSLContext()
context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs()
context.load_cert_chain(config["ssl_cert_file"], keyfile=config["ssl_key_file"])
context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
context = None
# Print our startup messages
print("")
print("|------------------------------------------------------------|")
print("| |")
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
print("| ██ ▜█▙ ▟█▛ ██ |")
print("| ███████████ ▜█▙ ▟█▛ ██ |")
print("| ██ ▜█▙▟█▛ ███████████ |")
print("| |")
print("|------------------------------------------------------------|")
print("| Parallel Virtual Cluster API daemon v{0: <21} |".format(version))
print("| Debug: {0: <51} |".format(str(config["debug"])))
print("| API version: v{0: <44} |".format(API_VERSION))
print("|--------------------------------------------------------------|")
print("| |")
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
print("| ██ ▜█▙ ▟█▛ ██ |")
print("| ███████████ ▜█▙ ▟█▛ ██ |")
print("| ██ ▜█▙▟█▛ ███████████ |")
print("| |")
print("|--------------------------------------------------------------|")
print("| Parallel Virtual Cluster API daemon v{0: <23} |".format(version))
print("| Debug: {0: <53} |".format(str(config["debug"])))
print("| API version: v{0: <46} |".format(API_VERSION))
print(
"| Listen: {0: <50} |".format(
"{}:{}".format(config["listen_address"], config["listen_port"])
"| Listen: {0: <52} |".format(
"{}:{}".format(config["api_listen_address"], config["api_listen_port"])
)
)
print("| SSL: {0: <53} |".format(str(config["ssl_enabled"])))
print("| Authentication: {0: <42} |".format(str(config["auth_enabled"])))
print("|------------------------------------------------------------|")
print("| SSL: {0: <55} |".format(str(config["api_ssl_enabled"])))
print("| Authentication: {0: <44} |".format(str(config["api_auth_enabled"])))
print("|--------------------------------------------------------------|")
print("")
pvc_api.celery_startup()
pvc_api.app.run(
config["listen_address"],
config["listen_port"],
config["api_listen_address"],
config["api_listen_port"],
threaded=True,
ssl_context=context,
)

View File

@ -31,25 +31,12 @@ from uuid import uuid4
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.node import get_list as get_node_list
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from daemon_lib.benchmark import list_benchmarks
from pvcapid.Daemon import config, strtobool, API_VERSION
import pvcapid.helper as api_helper
import pvcapid.provisioner as api_provisioner
import pvcapid.vmbuilder as api_vmbuilder
import pvcapid.benchmark as api_benchmark
import pvcapid.ova as api_ova
from flask_sqlalchemy import SQLAlchemy
@ -61,11 +48,11 @@ app = flask.Flask(__name__)
# Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
config["database_user"],
config["database_password"],
config["database_host"],
config["database_port"],
config["database_name"],
config["api_postgresql_user"],
config["api_postgresql_password"],
config["api_postgresql_host"],
config["api_postgresql_port"],
config["api_postgresql_dbname"],
)
if config["debug"]:
@ -73,8 +60,8 @@ if config["debug"]:
else:
app.config["DEBUG"] = False
if config["auth_enabled"]:
app.config["SECRET_KEY"] = config["auth_secret_key"]
if config["api_auth_enabled"]:
app.config["SECRET_KEY"] = config["api_auth_secret_key"]
# Create SQLAlchemy database
db = SQLAlchemy(app)
@ -99,41 +86,34 @@ def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# Set up Celery task ID generator
# 1. Lets us make our own IDs (first section of UUID)
# 2. Lets us distribute jobs to the required pvcworkerd instances
def run_celery_task(task_name, **kwargs):
task_id = str(uuid4()).split("-")[0]
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
if "run_on" in kwargs and kwargs["run_on"] != "primary":
run_on = kwargs["run_on"]
else:
run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}")
print("----")
return run_on
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
print(
f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}"
)
task = celery.send_task(
task_name,
task_id=task_id,
kwargs=kwargs,
queue=run_on,
)
return task
# Create celery definition
celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
config["keydb_host"], config["keydb_port"], config["keydb_path"]
)
celery = Celery(
app.name,
@ -153,7 +133,6 @@ def celery_startup():
app.config["task_queues"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
app.config["task_routes"] = (route_task,)
celery.conf.update(app.config)
@ -199,7 +178,7 @@ def Authenticator(function):
@wraps(function)
def authenticate(*args, **kwargs):
# No authentication required
if not config["auth_enabled"]:
if not config["api_auth_enabled"]:
return function(*args, **kwargs)
# Session-based authentication
if "token" in flask.session:
@ -208,7 +187,7 @@ def Authenticator(function):
if "X-Api-Key" in flask.request.headers:
if any(
token
for token in config["auth_tokens"]
for token in config["api_auth_tokens"]
if flask.request.headers.get("X-Api-Key") == token.get("token")
):
return function(*args, **kwargs)
@ -220,171 +199,6 @@ def Authenticator(function):
return authenticate
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool=None, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
##########################################################
# API Root/Authentication
##########################################################
@ -469,12 +283,12 @@ class API_Login(Resource):
type: object
id: Message
"""
if not config["auth_enabled"]:
if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root))
if any(
token
for token in config["auth_tokens"]
for token in config["api_auth_tokens"]
if flask.request.values["token"] in token["token"]
):
flask.session["token"] = flask.request.form["token"]
@ -503,7 +317,7 @@ class API_Logout(Resource):
302:
description: Authentication disabled
"""
if not config["auth_enabled"]:
if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root))
flask.session.pop("token", None)
@ -2459,7 +2273,7 @@ class API_VM_Locks(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
@ -2594,7 +2408,7 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
@ -2648,7 +2462,7 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
@ -4364,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string (integer)
description: The number of minor page faults during the test
"""
return api_benchmark.list_benchmarks(reqargs.get("job", None))
return list_benchmarks(config, reqargs.get("job", None))
@RequestParser(
[
@ -4405,7 +4219,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400
task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
)
return (
{
@ -4531,7 +4345,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
node = reqargs.get("node", None)
task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
"osd.add_db_vg", device=reqargs.get("device", None), run_on=node
)
return (
@ -4730,7 +4544,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
node = reqargs.get("node", None)
task = run_celery_task(
osd_add,
"osd.add",
device=reqargs.get("device", None),
weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None),
@ -4849,7 +4663,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_replace,
"osd.replace",
osd_id=osdid,
new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None),
@ -4907,7 +4721,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_refresh,
"osd.refresh",
osd_id=osdid,
device=reqargs.get("device", None),
ext_db_flag=False,
@ -4978,7 +4792,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_remove,
"osd.remove",
osd_id=osdid,
force_flag=reqargs.get("force", False),
run_on=node,
@ -8507,7 +8321,7 @@ class API_Provisioner_Create_Root(Resource):
start_vm = False
task = run_celery_task(
create_vm,
"provisioner.create",
vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None),
define_vm=define_vm,

View File

@ -48,11 +48,11 @@ import pvcapid.provisioner as provisioner
# Database connections
def open_database(config):
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_name"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur

View File

@ -63,11 +63,11 @@ class ProvisioningError(Exception):
# Database connections
def open_database(config):
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_dbname"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur

View File

@ -1,16 +0,0 @@
# Parallel Virtual Cluster Celery Worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster Celery Worker daemon
After = network-online.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStart = /usr/share/pvc/pvcworkerd.sh
Restart = on-failure
[Install]
WantedBy = multi-user.target

View File

@ -42,7 +42,7 @@ echo " done. Package version ${version}."
# Install the client(s) locally
echo -n "Installing client packages locally..."
$SUDO dpkg -i ../pvc-client*_${version}*.deb &>/dev/null
$SUDO dpkg -i --force-all ../pvc-client*_${version}*.deb &>/dev/null
echo " done".
for HOST in ${HOSTS[@]}; do
@ -59,12 +59,16 @@ fi
for HOST in ${HOSTS[@]}; do
echo "> Deploying packages to host ${HOST}"
echo -n "Installing packages..."
ssh $HOST $SUDO dpkg -i /tmp/pvc/*.deb &>/dev/null
ssh $HOST $SUDO dpkg -i --force-all /tmp/pvc/*.deb &>/dev/null
ssh $HOST rm -rf /tmp/pvc &>/dev/null
echo " done."
echo -n "Restarting PVC daemons..."
ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null
echo " done."
echo -n "Waiting for node daemon to be running..."

View File

@ -13,9 +13,11 @@ echo ${new_ver} >&3
tmpdir=$( mktemp -d )
cp -a debian/changelog client-cli/setup.py ${tmpdir}/
cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py
cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py
cp -a worker-daemon/pvcworkerd/Daemon.py ${tmpdir}/worker-Daemon.py
cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py
# Replace the "base" version with the git revision version
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog
cat <<EOF > debian/changelog
pvc (${new_ver}) unstable; urgency=medium
@ -33,6 +35,8 @@ dpkg-buildpackage -us -uc
cp -a ${tmpdir}/changelog debian/changelog
cp -a ${tmpdir}/setup.py client-cli/setup.py
cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py
cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py
cp -a ${tmpdir}/worker-Daemon.py worker-daemon/pvcworkerd/Daemon.py
cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py
# Clean up

View File

@ -17,9 +17,10 @@ echo "# Write the changelog below; comments will be ignored" >> ${changelog_file
$EDITOR ${changelog_file}
changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )"
rm ${changelog_file}
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," health-daemon/pvchealthd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," worker-daemon/pvcworkerd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py
sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py
echo ${new_version} > .version
@ -46,11 +47,13 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file}
echo -e "${deb_changelog_orig}" >> ${deb_changelog_file}
mv ${deb_changelog_file} debian/changelog
git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git add node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git commit -v
popd &>/dev/null
rm ${changelog_file}
echo
echo "Release message:"
echo

View File

@ -5960,8 +5960,8 @@ def cli(
"PVC_COLOUR": Force colour on the output even if Click determines it is not a console (e.g. with 'watch')
If a "-c"/"--connection"/"PVC_CONNECTION" is not specified, the CLI will attempt to read a "local" connection
from the API configuration at "/etc/pvc/pvcapid.yaml". If no such configuration is found, the command will
abort with an error. This applies to all commands except those under "connection".
from the API configuration at "/etc/pvc/pvc.conf". If no such configuration is found, the command will abort
with an error. This applies to all commands except those under "connection".
"""
global CLI_CONFIG

View File

@ -33,14 +33,14 @@ from subprocess import run, PIPE
from sys import argv
from syslog import syslog, openlog, closelog, LOG_AUTH
from yaml import load as yload
from yaml import BaseLoader, SafeLoader
from yaml import SafeLoader
import pvc.lib.provisioner
import pvc.lib.vm
import pvc.lib.node
DEFAULT_STORE_DATA = {"cfgfile": "/etc/pvc/pvcapid.yaml"}
DEFAULT_STORE_DATA = {"cfgfile": "/etc/pvc/pvc.conf"}
DEFAULT_STORE_FILENAME = "pvc.json"
DEFAULT_API_PREFIX = "/api/v1"
DEFAULT_NODE_HOSTNAME = gethostname().split(".")[0]
@ -88,14 +88,15 @@ def read_config_from_yaml(cfgfile):
try:
with open(cfgfile) as fh:
api_config = yload(fh, Loader=BaseLoader)["pvc"]["api"]
api_config = yload(fh, Loader=SafeLoader)["api"]
host = api_config["listen_address"]
port = api_config["listen_port"]
scheme = "https" if strtobool(api_config["ssl"]["enabled"]) else "http"
host = api_config["listen"]["address"]
port = api_config["listen"]["port"]
scheme = "https" if api_config["ssl"]["enabled"] else "http"
api_key = (
api_config["authentication"]["tokens"][0]["token"]
if strtobool(api_config["authentication"]["enabled"])
api_config["token"][0]["token"]
if api_config["authentication"]["enabled"]
and api_config["authentication"]["source"] == "token"
else None
)
except KeyError:

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="pvc",
version="0.9.82",
version="0.9.83",
packages=["pvc.cli", "pvc.lib"],
install_requires=[
"Click",

View File

@ -25,9 +25,6 @@ import psycopg2.extras
from datetime import datetime
from json import loads, dumps
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
@ -135,11 +132,11 @@ def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
# Database connections
def open_database(config):
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_dbname"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur
@ -152,7 +149,7 @@ def close_database(conn, cur, failed=False):
conn.close()
def list_benchmarks(job=None):
def list_benchmarks(config, job=None):
if job is not None:
query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks")
args = (job,)
@ -278,17 +275,8 @@ def run_benchmark_job(
return jstdout
def run_benchmark(self, pool):
def worker_run_benchmark(zkhandler, celery, config, pool):
# Phase 0 - connect to databases
try:
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
fail(
self,
"Failed to connect to Zookeeper",
)
cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}"
@ -296,7 +284,7 @@ def run_benchmark(self, pool):
current_stage = 0
total_stages = 13
start(
self,
celery,
f"Running storage benchmark '{job_name}' on pool '{pool}'",
current=current_stage,
total=total_stages,
@ -312,13 +300,13 @@ def run_benchmark(self, pool):
zkhandler=zkhandler,
)
fail(
self,
celery,
"Failed to connect to Postgres",
)
current_stage += 1
update(
self,
celery,
"Storing running status in database",
current=current_stage,
total=total_stages,
@ -340,11 +328,11 @@ def run_benchmark(self, pool):
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError)
fail(celery, f"Failed to store running status: {e}", exception=BenchmarkError)
current_stage += 1
update(
self,
celery,
"Creating benchmark volume",
current=current_stage,
total=total_stages,
@ -363,7 +351,7 @@ def run_benchmark(self, pool):
for test in test_matrix:
current_stage += 1
update(
self,
celery,
f"Running benchmark job '{test}'",
current=current_stage,
total=total_stages,
@ -381,7 +369,7 @@ def run_benchmark(self, pool):
# Phase 3 - cleanup
current_stage += 1
update(
self,
celery,
"Cleaning up venchmark volume",
current=current_stage,
total=total_stages,
@ -397,7 +385,7 @@ def run_benchmark(self, pool):
current_stage += 1
update(
self,
celery,
"Storing results in database",
current=current_stage,
total=total_stages,
@ -415,7 +403,7 @@ def run_benchmark(self, pool):
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError)
fail(celery, f"Failed to store test results: {e}", exception=BenchmarkError)
cleanup(
job_name,
@ -426,7 +414,7 @@ def run_benchmark(self, pool):
current_stage += 1
return finish(
self,
celery,
f"Storage benchmark {job_name} completed successfully",
current=current_stage,
total=total_stages,

View File

@ -19,8 +19,6 @@
#
###############################################################################
import daemon_lib.common as common
import os
import subprocess
import yaml
@ -29,7 +27,6 @@ from socket import gethostname
from re import findall
from psutil import cpu_count
from ipaddress import ip_address, ip_network
from json import loads
class MalformedConfigurationError(Exception):
@ -70,29 +67,16 @@ def get_static_data():
def get_configuration_path():
config_file = None
try:
_config_file = "/etc/pvc/pvcnoded.yaml"
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "legacy"
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "current"
except Exception:
pass
if not config_file:
print('ERROR: The "PVC_CONFIG_FILE" environment variable must be set.')
os._exit(1)
return config_file, config_type
return config_file
def get_hostname():
@ -137,7 +121,7 @@ def validate_floating_ip(config, network):
return True, ""
def get_configuration_current(config_file):
def get_parsed_configuration(config_file):
print('Loading configuration from file "{}"'.format(config_file))
with open(config_file, "r") as cfgfh:
@ -271,17 +255,17 @@ def get_configuration_current(config_file):
"keydb_port": o_database["keydb"]["port"],
"keydb_host": o_database["keydb"]["hostname"],
"keydb_path": o_database["keydb"]["path"],
"metadata_postgresql_port": o_database["postgres"]["port"],
"metadata_postgresql_host": o_database["postgres"]["hostname"],
"metadata_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"api_postgresql_port": o_database["postgres"]["port"],
"api_postgresql_host": o_database["postgres"]["hostname"],
"api_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"database"
],
"metadata_postgresql_user": o_database["postgres"]["credentials"]["api"][
"api_postgresql_user": o_database["postgres"]["credentials"]["api"][
"username"
],
"metadata_postgresql_password": o_database["postgres"]["credentials"][
"api"
]["password"],
"api_postgresql_password": o_database["postgres"]["credentials"]["api"][
"password"
],
"pdns_postgresql_port": o_database["postgres"]["port"],
"pdns_postgresql_host": o_database["postgres"]["hostname"],
"pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][
@ -335,9 +319,7 @@ def get_configuration_current(config_file):
"log_keepalive_cluster_details": o_logging.get(
"log_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_monitoring_details", False
),
"log_monitoring_details": o_logging.get("log_monitoring_details", False),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
@ -362,323 +344,64 @@ def get_configuration_current(config_file):
+ o_ceph["ceph_keyring_file"],
"ceph_monitor_port": o_ceph["monitor_port"],
"ceph_secret_uuid": o_ceph["secret_uuid"],
"storage_hosts": o_ceph.get("monitor_hosts", None),
}
config = {**config, **config_ceph}
# Add our node static data to the config
config["static_data"] = get_static_data()
o_api = o_config["api"]
except Exception as e:
raise MalformedConfigurationError(e)
return config
def get_configuration_legacy(pvcnoded_config_file):
print('Loading configuration from file "{}"'.format(pvcnoded_config_file))
with open(pvcnoded_config_file, "r") as cfgfile:
try:
o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader)
except Exception as e:
print("ERROR: Failed to parse configuration file: {}".format(e))
os._exit(1)
node_fqdn, node_hostname, node_domain, node_id = get_hostname()
# Create the configuration dictionary
config = dict()
# Get the initial base configuration
try:
o_base = o_config["pvc"]
o_cluster = o_config["pvc"]["cluster"]
except Exception as e:
raise MalformedConfigurationError(e)
config_general = {
"node": o_base.get("node", node_hostname),
"node_hostname": node_hostname,
"node_fqdn": node_fqdn,
"node_domain": node_domain,
"node_id": node_id,
"coordinators": o_cluster.get("coordinators", list()),
"debug": o_base.get("debug", False),
}
config = {**config, **config_general}
# Get the functions configuration
try:
o_functions = o_config["pvc"]["functions"]
except Exception as e:
raise MalformedConfigurationError(e)
config_functions = {
"enable_hypervisor": o_functions.get("enable_hypervisor", False),
"enable_networking": o_functions.get("enable_networking", False),
"enable_storage": o_functions.get("enable_storage", False),
"enable_worker": o_functions.get("enable_worker", True),
"enable_api": o_functions.get("enable_api", False),
}
config = {**config, **config_functions}
# Get the directory configuration
try:
o_directories = o_config["pvc"]["system"]["configuration"]["directories"]
except Exception as e:
raise MalformedConfigurationError(e)
config_directories = {
"plugin_directory": o_directories.get(
"plugin_directory", "/usr/share/pvc/plugins"
),
"dynamic_directory": o_directories.get("dynamic_directory", None),
"log_directory": o_directories.get("log_directory", None),
"console_log_directory": o_directories.get("console_log_directory", None),
}
# Define our dynamic directory schema
config_directories["dnsmasq_dynamic_directory"] = (
config_directories["dynamic_directory"] + "/dnsmasq"
)
config_directories["pdns_dynamic_directory"] = (
config_directories["dynamic_directory"] + "/pdns"
)
config_directories["nft_dynamic_directory"] = (
config_directories["dynamic_directory"] + "/nft"
)
# Define our log directory schema
config_directories["dnsmasq_log_directory"] = (
config_directories["log_directory"] + "/dnsmasq"
)
config_directories["pdns_log_directory"] = (
config_directories["log_directory"] + "/pdns"
)
config_directories["nft_log_directory"] = (
config_directories["log_directory"] + "/nft"
)
config = {**config, **config_directories}
# Get the logging configuration
try:
o_logging = o_config["pvc"]["system"]["configuration"]["logging"]
except Exception as e:
raise MalformedConfigurationError(e)
config_logging = {
"file_logging": o_logging.get("file_logging", False),
"stdout_logging": o_logging.get("stdout_logging", False),
"zookeeper_logging": o_logging.get("zookeeper_logging", False),
"log_colours": o_logging.get("log_colours", False),
"log_dates": o_logging.get("log_dates", False),
"log_keepalives": o_logging.get("log_keepalives", False),
"log_keepalive_cluster_details": o_logging.get(
"log_keepalive_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_keepalive_plugin_details", False
),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
config = {**config, **config_logging}
# Get the interval configuration
try:
o_intervals = o_config["pvc"]["system"]["intervals"]
except Exception as e:
raise MalformedConfigurationError(e)
config_intervals = {
"vm_shutdown_timeout": int(o_intervals.get("vm_shutdown_timeout", 60)),
"keepalive_interval": int(o_intervals.get("keepalive_interval", 5)),
"monitoring_interval": int(o_intervals.get("monitoring_interval", 60)),
"fence_intervals": int(o_intervals.get("fence_intervals", 6)),
"suicide_intervals": int(o_intervals.get("suicide_interval", 0)),
}
config = {**config, **config_intervals}
# Get the fencing configuration
try:
o_fencing = o_config["pvc"]["system"]["fencing"]
o_fencing_actions = o_fencing["actions"]
o_fencing_ipmi = o_fencing["ipmi"]
except Exception as e:
raise MalformedConfigurationError(e)
config_fencing = {
"successful_fence": o_fencing_actions.get("successful_fence", None),
"failed_fence": o_fencing_actions.get("failed_fence", None),
"ipmi_hostname": o_fencing_ipmi.get(
"host", f"{node_hostname}-lom.{node_domain}"
),
"ipmi_username": o_fencing_ipmi.get("user", "null"),
"ipmi_password": o_fencing_ipmi.get("pass", "null"),
}
config = {**config, **config_fencing}
# Get the migration configuration
try:
o_migration = o_config["pvc"]["system"]["migration"]
except Exception as e:
raise MalformedConfigurationError(e)
config_migration = {
"migration_target_selector": o_migration.get("target_selector", "mem"),
}
config = {**config, **config_migration}
if config["enable_networking"]:
# Get the node networks configuration
try:
o_networks = o_config["pvc"]["cluster"]["networks"]
o_network_cluster = o_networks["cluster"]
o_network_storage = o_networks["storage"]
o_network_upstream = o_networks["upstream"]
o_sysnetworks = o_config["pvc"]["system"]["configuration"]["networking"]
o_sysnetwork_cluster = o_sysnetworks["cluster"]
o_sysnetwork_storage = o_sysnetworks["storage"]
o_sysnetwork_upstream = o_sysnetworks["upstream"]
except Exception as e:
raise MalformedConfigurationError(e)
config_networks = {
"cluster_domain": o_network_cluster.get("domain", None),
"cluster_network": o_network_cluster.get("network", None),
"cluster_floating_ip": o_network_cluster.get("floating_ip", None),
"cluster_dev": o_sysnetwork_cluster.get("device", None),
"cluster_mtu": o_sysnetwork_cluster.get("mtu", None),
"cluster_dev_ip": o_sysnetwork_cluster.get("address", None),
"storage_domain": o_network_storage.get("domain", None),
"storage_network": o_network_storage.get("network", None),
"storage_floating_ip": o_network_storage.get("floating_ip", None),
"storage_dev": o_sysnetwork_storage.get("device", None),
"storage_mtu": o_sysnetwork_storage.get("mtu", None),
"storage_dev_ip": o_sysnetwork_storage.get("address", None),
"upstream_domain": o_network_upstream.get("domain", None),
"upstream_network": o_network_upstream.get("network", None),
"upstream_floating_ip": o_network_upstream.get("floating_ip", None),
"upstream_gateway": o_network_upstream.get("gateway", None),
"upstream_dev": o_sysnetwork_upstream.get("device", None),
"upstream_mtu": o_sysnetwork_upstream.get("mtu", None),
"upstream_dev_ip": o_sysnetwork_upstream.get("address", None),
"bridge_dev": o_sysnetworks.get("bridge_device", None),
"bridge_mtu": o_sysnetworks.get("bridge_mtu", None),
"enable_sriov": o_sysnetworks.get("sriov_enable", False),
"sriov_device": o_sysnetworks.get("sriov_device", list()),
o_api_listen = o_api["listen"]
config_api_listen = {
"api_listen_address": o_api_listen["address"],
"api_listen_port": o_api_listen["port"],
}
config = {**config, **config_api_listen}
if config_networks["bridge_mtu"] is None:
# Read the current MTU of bridge_dev and set bridge_mtu to it; avoids weird resets
retcode, stdout, stderr = common.run_os_command(
f"ip -json link show dev {config_networks['bridge_dev']}"
)
current_bridge_mtu = loads(stdout)[0]["mtu"]
print(
f"Config key bridge_mtu not explicitly set; using live MTU {current_bridge_mtu} from {config_networks['bridge_dev']}"
)
config_networks["bridge_mtu"] = current_bridge_mtu
o_api_authentication = o_api["authentication"]
config_api_authentication = {
"api_auth_enabled": o_api_authentication.get("enabled", False),
"api_auth_secret_key": o_api_authentication.get("secret_key", ""),
"api_auth_source": o_api_authentication.get("source", "token"),
}
config = {**config, **config_api_authentication}
config = {**config, **config_networks}
o_api_ssl = o_api["ssl"]
config_api_ssl = {
"api_ssl_enabled": o_api_ssl.get("enabled", False),
"api_ssl_cert_file": o_api_ssl.get("certificate", None),
"api_ssl_key_file": o_api_ssl.get("private_key", None),
}
config = {**config, **config_api_ssl}
for network_type in ["cluster", "storage", "upstream"]:
result, msg = validate_floating_ip(config, network_type)
if not result:
raise MalformedConfigurationError(msg)
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
address_key = "{}_dev_ip".format(network_type)
network_key = f"{network_type}_network"
network = ip_network(config[network_key])
# With autoselection of addresses, construct an IP from the relevant network
if config[address_key] == "by-id":
# The NodeID starts at 1, but indexes start at 0
address_id = int(config["node_id"]) - 1
# Grab the nth address from the network
config[address_key] = "{}/{}".format(
list(network.hosts())[address_id], network.prefixlen
# Set up our token list if specified
if config["api_auth_source"] == "token":
config["api_auth_tokens"] = o_api["token"]
else:
if config["api_auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling API authentication."
)
# Validate the provided IP instead
else:
try:
address = ip_address(config[address_key].split("/")[0])
if address not in list(network.hosts()):
raise
except Exception:
raise MalformedConfigurationError(
f"IP address {config[address_key]} for {address_key} is not valid"
)
# Get the PowerDNS aggregator database configuration
try:
o_pdnsdb = o_config["pvc"]["coordinator"]["dns"]["database"]
except Exception as e:
raise MalformedConfigurationError(e)
config_pdnsdb = {
"pdns_postgresql_host": o_pdnsdb.get("host", None),
"pdns_postgresql_port": o_pdnsdb.get("port", None),
"pdns_postgresql_dbname": o_pdnsdb.get("name", None),
"pdns_postgresql_user": o_pdnsdb.get("user", None),
"pdns_postgresql_password": o_pdnsdb.get("pass", None),
}
config = {**config, **config_pdnsdb}
# Get the Cloud-Init Metadata database configuration
try:
o_metadatadb = o_config["pvc"]["coordinator"]["metadata"]["database"]
except Exception as e:
raise MalformedConfigurationError(e)
config_metadatadb = {
"metadata_postgresql_host": o_metadatadb.get("host", None),
"metadata_postgresql_port": o_metadatadb.get("port", None),
"metadata_postgresql_dbname": o_metadatadb.get("name", None),
"metadata_postgresql_user": o_metadatadb.get("user", None),
"metadata_postgresql_password": o_metadatadb.get("pass", None),
}
config = {**config, **config_metadatadb}
if config["enable_storage"]:
# Get the storage configuration
try:
o_storage = o_config["pvc"]["system"]["configuration"]["storage"]
except Exception as e:
raise MalformedConfigurationError(e)
config_storage = {
"ceph_config_file": o_storage.get("ceph_config_file", None),
"ceph_admin_keyring": o_storage.get("ceph_admin_keyring", None),
}
config = {**config, **config_storage}
config["api_auth_enabled"] = False
# Add our node static data to the config
config["static_data"] = get_static_data()
except Exception as e:
raise MalformedConfigurationError(e)
return config
def get_configuration():
"""
Parse the configuration of the node daemon.
Get the configuration.
"""
pvc_config_file, pvc_config_type = get_configuration_path()
if pvc_config_type == "legacy":
config = get_configuration_legacy(pvc_config_file)
else:
config = get_configuration_current(pvc_config_file)
pvc_config_file = get_configuration_path()
config = get_parsed_configuration(pvc_config_file)
return config

View File

@ -80,7 +80,7 @@ class Logger(object):
self.config["log_directory"] + "/" + self.config["daemon_name"] + ".log"
)
# We open the logfile for the duration of our session, but have a hup function
self.writer = open(self.logfile, "a", buffering=0)
self.writer = open(self.logfile, "a")
self.last_colour = ""
self.last_prompt = ""
@ -93,12 +93,10 @@ class Logger(object):
# Provide a hup function to close and reopen the writer
def hup(self):
self.writer.close()
self.writer = open(self.logfile, "a", buffering=0)
self.writer = open(self.logfile, "a")
# Provide a termination function so all messages are flushed before terminating the main daemon
def terminate(self):
if self.config["file_logging"]:
self.writer.close()
if self.config["zookeeper_logging"]:
self.out("Waiting for Zookeeper message queue to drain", state="s")
@ -112,6 +110,9 @@ class Logger(object):
self.zookeeper_logger.stop()
self.zookeeper_logger.join()
if self.config["file_logging"]:
self.writer.close()
# Output function
def out(self, message, state=None, prefix=""):
# Get the date
@ -152,11 +153,14 @@ class Logger(object):
# Assemble output string
output = colour + prompt + endc + date + prefix + message
self.writer.write(output + "\n")
self.writer.flush()
# Log to Zookeeper
if self.config["zookeeper_logging"]:
# Set the daemon value (only used here as others do not overlap with different daemons)
daemon = f"{self.config['daemon_name']}: "
# Expand to match all daemon names (pvcnoded, pvcworkerd, pvchealthd)
daemon = "{daemon: <12}".format(daemon=daemon)
# Assemble output string
output = daemon + colour + prompt + endc + date + prefix + message
self.zookeeper_queue.put(output)

View File

@ -31,8 +31,6 @@ import uuid
from contextlib import contextmanager
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
@ -167,11 +165,11 @@ def chroot(destination):
def open_db(config):
try:
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_name"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception:
@ -216,8 +214,14 @@ def open_zk(config):
#
# Main VM provisioning function - executed by the Celery worker
#
def create_vm(
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
def worker_create_vm(
celery,
config,
vm_name,
vm_profile,
define_vm=True,
start_vm=True,
script_run_args=[],
):
current_stage = 0
total_stages = 11
@ -326,9 +330,9 @@ def create_vm(
vm_data["system_architecture"] = stdout.strip()
monitor_list = list()
coordinator_names = config["storage_hosts"]
for coordinator in coordinator_names:
monitor_list.append("{}.{}".format(coordinator, config["storage_domain"]))
monitor_names = config["storage_hosts"]
for monitor in monitor_names:
monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
vm_data["ceph_monitor_list"] = monitor_list
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]

14
debian/changelog vendored
View File

@ -1,3 +1,17 @@
pvc (0.9.83-0) unstable; urgency=high
**Breaking Changes:** This release features a breaking change for the daemon config. A new unified "pvc.conf" file is required for all daemons (and the CLI client for Autobackup and API-on-this-host functionality), which will be written by the "pvc" role in the PVC Ansible framework. Using the "update-pvc-daemons" oneshot playbook from PVC Ansible is **required** to update to this release, as it will ensure this file is written to the proper place before deploying the new package versions, and also ensures that the old entires are cleaned up afterwards. In addition, this release fully splits the node worker and health subsystems into discrete daemons ("pvcworkerd" and "pvchealthd") and packages ("pvc-daemon-worker" and "pvc-daemon-health") respectively. The "pvc-daemon-node" package also now depends on both packages, and the "pvc-daemon-api" package can now be reliably used outside of the PVC nodes themselves (for instance, in a VM) without any strange cross-dependency issues.
* [All] Unifies all daemon (and on-node CLI task) configuration into a "pvc.conf" YAML configuration.
* [All] Splits the node worker subsystem into a discrete codebase and package ("pvc-daemon-worker"), still named "pvcworkerd".
* [All] Splits the node health subsystem into a discrete codebase and package ("pvc-daemon-health"), named "pvchealthd".
* [All] Improves Zookeeper node logging to avoid bugs and to support multiple simultaneous daemon writes.
* [All] Fixes several bugs in file logging and splits file logs by daemon.
* [Node Daemon] Improves several log messages to match new standards from Health daemon.
* [API Daemon] Reworks Celery task routing and handling to move all worker tasks to Worker daemon.
-- Joshua M. Boniface <joshua@boniface.me> Fri, 01 Dec 2023 17:33:53 -0500
pvc (0.9.82-0) unstable; urgency=high
* [API Daemon] Fixes a bug where the Celery result_backend was not loading properly on Celery <5.2.x (Debian 10/11).

22
debian/control vendored
View File

@ -4,19 +4,35 @@ Priority: optional
Maintainer: Joshua Boniface <joshua@boniface.me>
Standards-Version: 3.9.8
Homepage: https://www.boniface.me
X-Python3-Version: >= 3.2
X-Python3-Version: >= 3.7
Package: pvc-daemon-node
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Depends: systemd, pvc-daemon-common, pvc-daemon-health, pvc-daemon-worker, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Description: Parallel Virtual Cluster node daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC node daemon
Package: pvc-daemon-health
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
Description: Parallel Virtual Cluster health daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC health monitoring daemon
Package: pvc-daemon-worker
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio
Description: Parallel Virtual Cluster worker daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC Celery task worker daemon
Package: pvc-daemon-api
Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.

View File

@ -3,7 +3,5 @@ api-daemon/pvcapid-manage*.py usr/share/pvc
api-daemon/pvc-api-db-upgrade usr/share/pvc
api-daemon/pvcapid usr/share/pvc
api-daemon/pvcapid.service lib/systemd/system
api-daemon/pvcworkerd.service lib/systemd/system
api-daemon/pvcworkerd.sh usr/share/pvc
api-daemon/provisioner usr/share/pvc
api-daemon/migrations usr/share/pvc

5
debian/pvc-daemon-api.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.install vendored Normal file
View File

@ -0,0 +1,4 @@
health-daemon/pvchealthd.py usr/share/pvc
health-daemon/pvchealthd usr/share/pvc
health-daemon/pvchealthd.service lib/systemd/system
health-daemon/plugins usr/share/pvc

14
debian/pvc-daemon-health.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvchealthd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvchealthd.service; then
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

6
debian/pvc-daemon-health.preinst vendored Normal file
View File

@ -0,0 +1,6 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/plugins -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvchealthd.service

View File

@ -4,4 +4,3 @@ node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system
node-daemon/pvcautoready.service lib/systemd/system
node-daemon/monitoring usr/share/pvc
node-daemon/plugins usr/share/pvc

View File

@ -2,4 +2,4 @@
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.install vendored Normal file
View File

@ -0,0 +1,4 @@
worker-daemon/pvcworkerd.sh usr/share/pvc
worker-daemon/pvcworkerd.py usr/share/pvc
worker-daemon/pvcworkerd usr/share/pvc
worker-daemon/pvcworkerd.service lib/systemd/system

14
debian/pvc-daemon-worker.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvcworkerd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvcworkerd.service; then
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

5
debian/pvc-daemon-worker.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcworkerd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvcworkerd.service

View File

@ -8,7 +8,7 @@ import os
import sys
import json
os.environ['PVC_CONFIG_FILE'] = "./api-daemon/pvcapid.sample.yaml"
os.environ['PVC_CONFIG_FILE'] = "./pvc.sample.conf"
sys.path.append('api-daemon')

1
health-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -40,7 +40,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
@ -75,7 +75,7 @@ class MonitoringPluginScript(MonitoringPlugin):
# Set the health delta to 0 (no change)
health_delta = 0
# Craft a message that can be used by the clients
message = "Successfully connected to Libvirtd on localhost"
message = "Successfully connected to KeyDB/Redis on localhost"
# Check the Zookeeper connection
try:

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
@ -67,8 +67,8 @@ class MonitoringPluginScript(MonitoringPlugin):
# Run any imports first
from psycopg2 import connect
conn_metadata = None
cur_metadata = None
conn_api = None
cur_api = None
conn_dns = None
cur_dns = None
@ -79,25 +79,25 @@ class MonitoringPluginScript(MonitoringPlugin):
# Check the Metadata database (primary)
try:
conn_metadata = connect(
conn_api = connect(
host=self.this_node.name,
port=self.config["metadata_postgresql_port"],
dbname=self.config["metadata_postgresql_dbname"],
user=self.config["metadata_postgresql_user"],
password=self.config["metadata_postgresql_password"],
port=self.config["api_postgresql_port"],
dbname=self.config["api_postgresql_dbname"],
user=self.config["api_postgresql_user"],
password=self.config["api_postgresql_password"],
)
cur_metadata = conn_metadata.cursor()
cur_metadata.execute("""SELECT * FROM alembic_version""")
data = cur_metadata.fetchone()
cur_api = conn_api.cursor()
cur_api.execute("""SELECT * FROM alembic_version""")
data = cur_api.fetchone()
except Exception as e:
health_delta = 50
err = str(e).split('\n')[0]
message = f"Failed to connect to PostgreSQL database {self.config['metadata_postgresql_dbname']}: {err}"
message = f"Failed to connect to PostgreSQL database {self.config['api_postgresql_dbname']}: {err}"
finally:
if cur_metadata is not None:
cur_metadata.close()
if conn_metadata is not None:
conn_metadata.close()
if cur_api is not None:
cur_api.close()
if conn_api is not None:
conn_api.close()
if health_delta == 0:
# Check the PowerDNS database (secondary)

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

24
health-daemon/pvchealthd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvchealthd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.Daemon # noqa: F401
pvchealthd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster health daemon unit file
[Unit]
Description = Parallel Virtual Cluster health daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvchealthd.py
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Daemon.py - Health daemon main entrypoing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.util.zookeeper
import pvchealthd.objects.MonitoringInstance as MonitoringInstance
import pvchealthd.objects.NodeInstance as NodeInstance
import daemon_lib.config as cfg
import daemon_lib.log as log
from time import sleep
import os
import signal
# Daemon version
version = "0.9.83"
##########################################################
# Entrypoint
##########################################################
def entrypoint():
monitoring_instance = None
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvchealthd"
config["daemon_version"] = version
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|--------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster health daemon v{0: <20} |".format(version))
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|--------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s")
# Connect to Zookeeper and return our handler and current schema version
zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config)
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, monitoring_instance
logger.out("Terminating pvchealthd and cleaning up", state="s")
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Close the Zookeeper connection
try:
zkhandler.disconnect(persistent=True)
del zkhandler
except Exception:
pass
logger.out("Terminated health daemon", state="s")
logger.terminate()
if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function
def term(signum="", frame=""):
cleanup(failure=False)
# Hangup (logrotate) function
def hup(signum="", frame=""):
if config["file_logging"]:
logger.hup()
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGHUP, hup)
this_node = NodeInstance.NodeInstance(
config["node_hostname"],
zkhandler,
config,
logger,
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Tick loop; does nothing since everything is async
while True:
try:
sleep(1)
except Exception:
break

View File

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# PluginInstance.py - Class implementing a PVC monitoring instance
# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
@ -317,8 +317,17 @@ class MonitoringInstance(object):
)
if successful_plugins < 1:
self.logger.out(
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
state="e",
)
return
self.logger.out(
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
state="s",
)
# Clean up any old plugin data for which a plugin file no longer exists
plugins_data = self.zkhandler.children(
("node.monitoring.data", self.this_node.name)
@ -344,6 +353,7 @@ class MonitoringInstance(object):
def shutdown(self):
self.stop_check_timer()
self.run_cleanups()
return
def start_check_timer(self):
check_interval = self.config["monitoring_interval"]
@ -395,6 +405,11 @@ class MonitoringInstance(object):
active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now()
self.logger.out(
"Starting monitoring healthcheck run",
state="t",
)
total_health = 100
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
@ -406,10 +421,7 @@ class MonitoringInstance(object):
plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name):
if (
self.config["log_keepalives"]
and self.config["log_keepalive_plugin_details"]
):
if self.config["log_monitoring_details"]:
self.logger.out(
result.message + f" [-{result.health_delta}]",
state="t",

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
class NodeInstance(object):
# Initialization function
def __init__(
self,
name,
zkhandler,
config,
logger,
):
# Passed-in variables on creation
self.name = name
self.zkhandler = zkhandler
self.config = config
self.logger = logger
# States
self.daemon_state = "stop"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Node resources
self.health = 100
self.active_domains_count = 0
self.provisioned_domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.daemon", self.name)
)
def watch_node_daemonstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "stop"
if data != self.daemon_state:
self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.router", self.name)
)
def watch_node_routerstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "client"
if data != self.coordinator_state:
self.coordinator_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.domain", self.name)
)
def watch_node_domainstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "unknown"
if data != self.domain_state:
self.domain_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.monitoring.health", self.name)
)
def watch_node_health(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 100
try:
data = int(data)
except ValueError:
pass
if data != self.health:
self.health = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.free", self.name)
)
def watch_node_memfree(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.used", self.name)
)
def watch_node_memused(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.allocated", self.name)
)
def watch_node_memalloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.vcpu.allocated", self.name)
)
def watch_node_vcpualloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.running_domains", self.name)
)
def watch_node_runningdomains(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii").split()
except AttributeError:
data = []
if len(data) != self.active_domains_count:
self.active_domains_count = len(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.count.provisioned_domains", self.name)
)
def watch_node_domainscount(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.provisioned_domains_count:
self.provisioned_domains_count = data

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
# <Filename> - <Description>
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
##############################################################################
from daemon_lib.zkhandler import ZKHandler
import os
import time
def connect(logger, config):
# Create an instance of the handler
zkhandler = ZKHandler(config, logger)
try:
logger.out(
"Connecting to Zookeeper on coordinator nodes {}".format(
config["coordinators"]
),
state="i",
)
# Start connection
zkhandler.connect(persistent=True)
except Exception as e:
logger.out(
"ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e"
)
os._exit(1)
logger.out("Validating Zookeeper schema", state="i")
try:
node_schema_version = int(
zkhandler.read(("node.data.active_schema", config["node_hostname"]))
)
except Exception:
node_schema_version = int(zkhandler.read("base.schema.version"))
zkhandler.write(
[
(
("node.data.active_schema", config["node_hostname"]),
node_schema_version,
)
]
)
# Load in the current node schema version
zkhandler.schema.load(node_schema_version)
# Record the latest intalled schema version
latest_schema_version = zkhandler.schema.find_latest()
logger.out("Latest installed schema is {}".format(latest_schema_version), state="i")
zkhandler.write(
[(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)]
)
# If we are the last node to get a schema update, fire the master update
if latest_schema_version > node_schema_version:
node_latest_schema_version = list()
for node in zkhandler.children("base.node"):
node_latest_schema_version.append(
int(zkhandler.read(("node.data.latest_schema", node)))
)
# This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load.
if node_latest_schema_version.count(latest_schema_version) == len(
node_latest_schema_version
):
zkhandler.write([("base.schema.version", latest_schema_version)])
return zkhandler, node_schema_version
def validate_schema(logger, zkhandler):
# Validate our schema against the active version
if not zkhandler.schema.validate(zkhandler, logger):
logger.out("Found schema violations, applying", state="i")
zkhandler.schema.apply(zkhandler)
else:
logger.out("Schema successfully validated", state="o")
def setup_node(logger, config, zkhandler):
# Check if our node exists in Zookeeper, and create it if not
if config["daemon_mode"] == "coordinator":
init_routerstate = "secondary"
else:
init_routerstate = "client"
if zkhandler.exists(("node", config["node_hostname"])):
logger.out(
f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i"
)
# Update static data just in case it's changed
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
]
)
else:
logger.out(
f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node",
state="i",
)
keepalive_time = int(time.time())
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.keepalive", config["node_hostname"]), str(keepalive_time)),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.domain", config["node_hostname"]), "flushed"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
(("node.memory.total", config["node_hostname"]), "0"),
(("node.memory.used", config["node_hostname"]), "0"),
(("node.memory.free", config["node_hostname"]), "0"),
(("node.memory.allocated", config["node_hostname"]), "0"),
(("node.memory.provisioned", config["node_hostname"]), "0"),
(("node.vcpu.allocated", config["node_hostname"]), "0"),
(("node.cpu.load", config["node_hostname"]), "0.0"),
(("node.running_domains", config["node_hostname"]), "0"),
(("node.count.provisioned_domains", config["node_hostname"]), "0"),
(("node.count.networks", config["node_hostname"]), "0"),
]
)

View File

@ -20,14 +20,12 @@
###############################################################################
import pvcnoded.util.keepalive
import pvcnoded.util.config
import pvcnoded.util.fencing
import pvcnoded.util.networking
import pvcnoded.util.services
import pvcnoded.util.libvirt
import pvcnoded.util.zookeeper
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance
@ -36,6 +34,7 @@ import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
import pvcnoded.objects.CephInstance as CephInstance
import daemon_lib.config as cfg
import daemon_lib.log as log
import daemon_lib.common as common
@ -49,7 +48,7 @@ import re
import json
# Daemon version
version = "0.9.82"
version = "0.9.83"
##########################################################
@ -59,41 +58,40 @@ version = "0.9.82"
def entrypoint():
keepalive_timer = None
monitoring_instance = None
# Get our configuration
config = pvcnoded.util.config.get_configuration()
config = cfg.get_configuration()
config["daemon_name"] = "pvcnoded"
config["daemon_version"] = version
# Create and validate our directories
pvcnoded.util.config.validate_directories(config)
cfg.validate_directories(config)
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster node daemon v{0: <20} |".format(version))
logger.out("| Debug: {0: <51} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <52} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <52} |".format(config["node_hostname"]))
logger.out("| ID: {0: <54} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <43} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <50} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <50} |".format(config["static_data"][3]))
logger.out("| OS: {0: <52} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <48} |".format(config["static_data"][1]))
logger.out("|------------------------------------------------------------|")
logger.out("|--------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster node daemon v{0: <22} |".format(version))
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|--------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s")
@ -202,7 +200,7 @@ def entrypoint():
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance
nonlocal logger, zkhandler, keepalive_timer, d_domain
logger.out("Terminating pvcnoded and cleaning up", state="s")
@ -251,13 +249,6 @@ def entrypoint():
except Exception:
pass
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
@ -395,8 +386,8 @@ def entrypoint():
node_list = new_node_list
logger.out(
f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}',
state="i",
f'{logger.fmt_cyan}Node list:{logger.fmt_end} {" ".join(node_list)}',
state="s",
)
# Update node objects lists
@ -563,8 +554,8 @@ def entrypoint():
# Update the new list
network_list = new_network_list
logger.out(
f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}',
state="i",
f'{logger.fmt_cyan}Network list:{logger.fmt_end} {" ".join(network_list)}',
state="s",
)
# Update node objects list
@ -894,8 +885,8 @@ def entrypoint():
sriov_vf_list = sorted(new_sriov_vf_list)
logger.out(
f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
state="i",
f'{logger.fmt_cyan}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
state="s",
)
if config["enable_hypervisor"]:
@ -921,8 +912,8 @@ def entrypoint():
# Update the new list
domain_list = new_domain_list
logger.out(
f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
state="i",
f'{logger.fmt_cyan}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
state="s",
)
# Update node objects' list
@ -948,8 +939,8 @@ def entrypoint():
# Update the new list
osd_list = new_osd_list
logger.out(
f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
state="i",
f'{logger.fmt_cyan}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
state="s",
)
# Pool objects
@ -973,8 +964,8 @@ def entrypoint():
# Update the new list
pool_list = new_pool_list
logger.out(
f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
state="i",
f'{logger.fmt_cyan}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
state="s",
)
# Volume objects (in each pool)
@ -1005,15 +996,10 @@ def entrypoint():
# Update the new list
volume_list[pool] = new_volume_list
logger.out(
f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
state="i",
f'{logger.fmt_cyan}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
state="s",
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node

View File

@ -131,11 +131,11 @@ class MetadataAPIInstance(object):
# Helper functions
def open_database(self):
conn = psycopg2.connect(
host=self.config["metadata_postgresql_host"],
port=self.config["metadata_postgresql_port"],
dbname=self.config["metadata_postgresql_dbname"],
user=self.config["metadata_postgresql_user"],
password=self.config["metadata_postgresql_password"],
host=self.config["api_postgresql_host"],
port=self.config["api_postgresql_port"],
dbname=self.config["api_postgresql_dbname"],
user=self.config["api_postgresql_user"],
password=self.config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=RealDictCursor)
return conn, cur

View File

@ -700,6 +700,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now()
logger.out(
"Starting node keepalive run",
state="t",
)
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:

View File

@ -78,13 +78,19 @@ def start_keydb(logger, config):
common.run_os_command("systemctl start keydb-server.service")
def start_worker(logger, config):
def start_workerd(logger, config):
if config["enable_worker"]:
logger.out("Starting Celery Worker daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start pvcworkerd.service")
def start_healthd(logger, config):
logger.out("Starting Health Monitoring daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start pvchealthd.service")
def start_system_services(logger, config):
start_zookeeper(logger, config)
start_libvirtd(logger, config)
@ -93,7 +99,8 @@ def start_system_services(logger, config):
start_ceph_mon(logger, config)
start_ceph_mgr(logger, config)
start_keydb(logger, config)
start_worker(logger, config)
start_workerd(logger, config)
start_healthd(logger, config)
logger.out("Waiting 10 seconds for daemons to start", state="s")
sleep(10)
logger.out("Waiting 5 seconds for daemons to start", state="s")
sleep(5)

View File

@ -290,7 +290,7 @@ logging:
# Enable or disable cluster detail logging during keepalive events
log_cluster_details: yes
# Enable or disable monitoring detail logging during keepalive events
# Enable or disable monitoring detail logging during healthcheck events
log_monitoring_details: yes
# Number of VM console log lines to store in Zookeeper (per VM)

1
worker-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

24
worker-daemon/pvcworkerd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvcworkerd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcworkerd.Daemon # noqa: F401
pvcworkerd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster worker daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvcworkerd.sh
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
# app arguments work in a non-backwards-compatible way with Celery 5.
case "$( cat /etc/debian_version )" in
10.*)
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="worker --app pvcworkerd.Daemon.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
*)
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="--app pvcworkerd.Daemon.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
esac

View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
# Daemon.py - PVC Node Worker daemon
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from celery import Celery
import daemon_lib.config as cfg
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from daemon_lib.benchmark import (
worker_run_benchmark,
)
from daemon_lib.vmbuilder import (
worker_create_vm,
)
# Daemon version
version = "0.9.83"
config = cfg.get_configuration()
config["daemon_name"] = "pvcworkerd"
config["daemon_version"] = version
celery_task_uri = "redis://{}:{}{}".format(
config["keydb_host"], config["keydb_port"], config["keydb_path"]
)
celery = Celery(
"pvcworkerd",
broker=celery_task_uri,
result_backend=celery_task_uri,
result_extended=True,
)
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return worker_create_vm(
self,
config,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, run_on="primary"):
@ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool)
return run_storage_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
def entrypoint():
pass

View File