Compare commits

...

19 Commits

Author SHA1 Message Date
102c3c3106 Port all Celery worker functions to discrete pkg
Moves all tasks run by the Celery worker into a discrete package/module
for easier installation. Also adjusts several parameters throughout to
accomplish this.
2023-11-30 02:24:54 -05:00
0c0fb65c62 Rework Flask API to route Celery tasks manually
Avoids needing to define any of these tasks here; they can all be
defined in the pvcworkerd code.
2023-11-30 00:40:09 -05:00
03a738f878 Move config parser into daemon_lib
And reformat/add config values for API.
2023-11-30 00:05:37 -05:00
4df5fdbca6 Update description of example conf 2023-11-29 21:21:51 -05:00
97eb63ebab Clean up config naming and dead files 2023-11-29 21:21:51 -05:00
4a2eba0961 Improve node output messages (from pvchealthd)
1. Output startup "list" entries in cyan with s state
2. Add start of keepalive run message
2023-11-29 21:21:51 -05:00
077dd8708f Add check start message 2023-11-29 21:21:51 -05:00
b6b5786c3b Output list in cyan (s state) 2023-11-29 21:21:51 -05:00
ad738dec40 Clean up plugin pycache too 2023-11-29 21:21:51 -05:00
d2b764a2c7 Output more details on startup 2023-11-29 21:21:51 -05:00
b8aecd9c83 Wait less time between restarts 2023-11-29 21:21:51 -05:00
11db3c5b20 Fix ordering during termination 2023-11-29 21:21:51 -05:00
7a7c975eff Ensure return from health shutdown 2023-11-29 21:21:51 -05:00
fa12a3c9b1 Permit buffered log appending 2023-11-29 21:21:51 -05:00
787f4216b3 Expand Zookeeper log daemon prefix to match 2023-11-29 21:21:51 -05:00
647cba3cf5 Expand startup width for new daemon name 2023-11-29 21:21:51 -05:00
921ecb3a05 Fix name in kydb plugin 2023-11-29 21:21:51 -05:00
6a68cf665b Wait between service restarts 2023-11-29 21:21:51 -05:00
41f4e4fb2f Split health monitoring into discrete daemon/pkg 2023-11-29 21:21:51 -05:00
58 changed files with 1227 additions and 572 deletions

View File

@ -19,13 +19,13 @@
#
###############################################################################
import os
import yaml
from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg
# Daemon version
version = "0.9.82"
@ -53,160 +53,13 @@ def strtobool(stringv):
# Configuration Parsing
##########################################################
# Parse the configuration file
config_file = None
try:
_config_file = "/etc/pvc/pvcapid.yaml"
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "legacy"
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "current"
except Exception:
pass
if not config_file:
print(
'Error: The "PVC_CONFIG_FILE" environment variable must be set before starting pvcapid.'
)
exit(1)
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvcapid"
config["daemon_version"] = version
def load_configuration_file(config_file):
print('Loading configuration from file "{}"'.format(config_file))
# Read in the config
try:
with open(config_file, "r") as cfgfile:
o_config = yaml.load(cfgfile, Loader=yaml.BaseLoader)
except Exception as e:
print("ERROR: Failed to parse configuration file: {}".format(e))
exit(1)
return o_config
def get_configuration_current(config_file):
o_config = load_configuration_file(config_file)
try:
# Create the config object
config = {
"debug": strtobool(o_config["logging"].get("debug_logging", "False")),
"all_nodes": o_config["cluster"]["all_nodes"],
"coordinators": o_config["cluster"]["coordinator_nodes"],
"listen_address": o_config["api"]["listen"]["address"],
"listen_port": int(o_config["api"]["listen"]["port"]),
"auth_enabled": strtobool(
o_config["api"]["authentication"].get("enabled", "False")
),
"auth_secret_key": o_config["api"]["authentication"]["secret_key"],
"auth_source": o_config["api"]["authentication"]["source"],
"ssl_enabled": strtobool(o_config["api"]["ssl"].get("enabled", "False")),
"ssl_cert_file": o_config["api"]["ssl"]["certificate"],
"ssl_key_file": o_config["api"]["ssl"]["private_key"],
"database_port": o_config["database"]["postgres"]["port"],
"database_host": o_config["database"]["postgres"]["hostname"],
"database_name": o_config["database"]["postgres"]["credentials"]["api"][
"database"
],
"database_user": o_config["database"]["postgres"]["credentials"]["api"][
"username"
],
"database_password": o_config["database"]["postgres"]["credentials"]["api"][
"password"
],
"queue_port": o_config["database"]["keydb"]["port"],
"queue_host": o_config["database"]["keydb"]["hostname"],
"queue_path": o_config["database"]["keydb"]["path"],
"storage_domain": o_config["cluster"]["networks"]["storage"]["domain"],
"storage_hosts": o_config["ceph"].get("monitor_hosts", None),
"ceph_monitor_port": o_config["ceph"]["monitor_port"],
"ceph_storage_secret_uuid": o_config["ceph"]["secret_uuid"],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
# Set up our token list if specified
if config["auth_source"] == "token":
config["auth_tokens"] = o_config["api"]["token"]
else:
if config["auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling authentication."
)
config["auth_enabled"] = False
except Exception as e:
print(f"ERROR: Failed to load configuration: {e}")
exit(1)
return config
def get_configuration_legacy(config_file):
o_config = load_configuration_file(config_file)
try:
# Create the config object
config = {
"debug": strtobool(o_config["pvc"]["debug"]),
"coordinators": o_config["pvc"]["coordinators"],
"listen_address": o_config["pvc"]["api"]["listen_address"],
"listen_port": int(o_config["pvc"]["api"]["listen_port"]),
"auth_enabled": strtobool(
o_config["pvc"]["api"]["authentication"]["enabled"]
),
"auth_secret_key": o_config["pvc"]["api"]["authentication"]["secret_key"],
"auth_tokens": o_config["pvc"]["api"]["authentication"]["tokens"],
"ssl_enabled": strtobool(o_config["pvc"]["api"]["ssl"]["enabled"]),
"ssl_key_file": o_config["pvc"]["api"]["ssl"]["key_file"],
"ssl_cert_file": o_config["pvc"]["api"]["ssl"]["cert_file"],
"database_host": o_config["pvc"]["provisioner"]["database"]["host"],
"database_port": int(o_config["pvc"]["provisioner"]["database"]["port"]),
"database_name": o_config["pvc"]["provisioner"]["database"]["name"],
"database_user": o_config["pvc"]["provisioner"]["database"]["user"],
"database_password": o_config["pvc"]["provisioner"]["database"]["pass"],
"queue_host": o_config["pvc"]["provisioner"]["queue"]["host"],
"queue_port": o_config["pvc"]["provisioner"]["queue"]["port"],
"queue_path": o_config["pvc"]["provisioner"]["queue"]["path"],
"storage_hosts": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_hosts"
],
"storage_domain": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_domain"
],
"ceph_monitor_port": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_monitor_port"
],
"ceph_storage_secret_uuid": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_storage_secret_uuid"
],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"]:
config["storage_hosts"] = config["coordinators"]
except Exception as e:
print("ERROR: Failed to load configuration: {}".format(e))
exit(1)
return config
if config_type == "legacy":
config = get_configuration_legacy(config_file)
else:
config = get_configuration_current(config_file)
##########################################################
# Entrypoint
##########################################################
@ -215,41 +68,43 @@ else:
def entrypoint():
import pvcapid.flaskapi as pvc_api # noqa: E402
if config["ssl_enabled"]:
if config["api_ssl_enabled"]:
context = SSLContext()
context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs()
context.load_cert_chain(config["ssl_cert_file"], keyfile=config["ssl_key_file"])
context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else:
context = None
# Print our startup messages
print("")
print("|------------------------------------------------------------|")
print("| |")
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
print("| ██ ▜█▙ ▟█▛ ██ |")
print("| ███████████ ▜█▙ ▟█▛ ██ |")
print("| ██ ▜█▙▟█▛ ███████████ |")
print("| |")
print("|------------------------------------------------------------|")
print("| Parallel Virtual Cluster API daemon v{0: <21} |".format(version))
print("| Debug: {0: <51} |".format(str(config["debug"])))
print("| API version: v{0: <44} |".format(API_VERSION))
print("|--------------------------------------------------------------|")
print("| |")
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
print("| ██ ▜█▙ ▟█▛ ██ |")
print("| ███████████ ▜█▙ ▟█▛ ██ |")
print("| ██ ▜█▙▟█▛ ███████████ |")
print("| |")
print("|--------------------------------------------------------------|")
print("| Parallel Virtual Cluster API daemon v{0: <23} |".format(version))
print("| Debug: {0: <53} |".format(str(config["debug"])))
print("| API version: v{0: <46} |".format(API_VERSION))
print(
"| Listen: {0: <50} |".format(
"{}:{}".format(config["listen_address"], config["listen_port"])
"| Listen: {0: <52} |".format(
"{}:{}".format(config["api_listen_address"], config["api_listen_port"])
)
)
print("| SSL: {0: <53} |".format(str(config["ssl_enabled"])))
print("| Authentication: {0: <42} |".format(str(config["auth_enabled"])))
print("|------------------------------------------------------------|")
print("| SSL: {0: <55} |".format(str(config["api_ssl_enabled"])))
print("| Authentication: {0: <44} |".format(str(config["api_auth_enabled"])))
print("|--------------------------------------------------------------|")
print("")
pvc_api.celery_startup()
pvc_api.app.run(
config["listen_address"],
config["listen_port"],
config["api_listen_address"],
config["api_listen_port"],
threaded=True,
ssl_context=context,
)

View File

@ -31,25 +31,12 @@ from uuid import uuid4
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.node import get_list as get_node_list
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from daemon_lib.benchmark import list_benchmarks
from pvcapid.Daemon import config, strtobool, API_VERSION
import pvcapid.helper as api_helper
import pvcapid.provisioner as api_provisioner
import pvcapid.vmbuilder as api_vmbuilder
import pvcapid.benchmark as api_benchmark
import pvcapid.ova as api_ova
from flask_sqlalchemy import SQLAlchemy
@ -61,11 +48,11 @@ app = flask.Flask(__name__)
# Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
config["database_user"],
config["database_password"],
config["database_host"],
config["database_port"],
config["database_name"],
config["api_postgresql_user"],
config["api_postgresql_password"],
config["api_postgresql_host"],
config["api_postgresql_port"],
config["api_postgresql_dbname"],
)
if config["debug"]:
@ -73,8 +60,8 @@ if config["debug"]:
else:
app.config["DEBUG"] = False
if config["auth_enabled"]:
app.config["SECRET_KEY"] = config["auth_secret_key"]
if config["api_auth_enabled"]:
app.config["SECRET_KEY"] = config["api_auth_secret_key"]
# Create SQLAlchemy database
db = SQLAlchemy(app)
@ -99,41 +86,34 @@ def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# Set up Celery task ID generator
# 1. Lets us make our own IDs (first section of UUID)
# 2. Lets us distribute jobs to the required pvcworkerd instances
def run_celery_task(task_name, **kwargs):
task_id = str(uuid4()).split("-")[0]
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
if "run_on" in kwargs and kwargs["run_on"] != "primary":
run_on = kwargs["run_on"]
else:
run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}")
print("----")
return run_on
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
print(
f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}"
)
task = celery.send_task(
task_name,
task_id=task_id,
kwargs=kwargs,
queue=run_on,
)
return task
# Create celery definition
celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
config["keydb_host"], config["keydb_port"], config["keydb_path"]
)
celery = Celery(
app.name,
@ -153,7 +133,6 @@ def celery_startup():
app.config["task_queues"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
app.config["task_routes"] = (route_task,)
celery.conf.update(app.config)
@ -199,7 +178,7 @@ def Authenticator(function):
@wraps(function)
def authenticate(*args, **kwargs):
# No authentication required
if not config["auth_enabled"]:
if not config["api_auth_enabled"]:
return function(*args, **kwargs)
# Session-based authentication
if "token" in flask.session:
@ -208,7 +187,7 @@ def Authenticator(function):
if "X-Api-Key" in flask.request.headers:
if any(
token
for token in config["auth_tokens"]
for token in config["api_auth_tokens"]
if flask.request.headers.get("X-Api-Key") == token.get("token")
):
return function(*args, **kwargs)
@ -220,171 +199,6 @@ def Authenticator(function):
return authenticate
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool=None, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
##########################################################
# API Root/Authentication
##########################################################
@ -469,12 +283,12 @@ class API_Login(Resource):
type: object
id: Message
"""
if not config["auth_enabled"]:
if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root))
if any(
token
for token in config["auth_tokens"]
for token in config["api_auth_tokens"]
if flask.request.values["token"] in token["token"]
):
flask.session["token"] = flask.request.form["token"]
@ -503,7 +317,7 @@ class API_Logout(Resource):
302:
description: Authentication disabled
"""
if not config["auth_enabled"]:
if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root))
flask.session.pop("token", None)
@ -2459,7 +2273,7 @@ class API_VM_Locks(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
@ -2594,7 +2408,7 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
@ -2648,7 +2462,7 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
@ -4364,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string (integer)
description: The number of minor page faults during the test
"""
return api_benchmark.list_benchmarks(reqargs.get("job", None))
return list_benchmarks(config, reqargs.get("job", None))
@RequestParser(
[
@ -4405,7 +4219,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400
task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
"storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
)
return (
{
@ -4531,7 +4345,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
node = reqargs.get("node", None)
task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
"osd.add_db_vg", device=reqargs.get("device", None), run_on=node
)
return (
@ -4730,7 +4544,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
node = reqargs.get("node", None)
task = run_celery_task(
osd_add,
"osd.add",
device=reqargs.get("device", None),
weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None),
@ -4849,7 +4663,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_replace,
"osd.replace",
osd_id=osdid,
new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None),
@ -4907,7 +4721,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_refresh,
"osd.refresh",
osd_id=osdid,
device=reqargs.get("device", None),
ext_db_flag=False,
@ -4978,7 +4792,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode
task = run_celery_task(
osd_remove,
"osd.remove",
osd_id=osdid,
force_flag=reqargs.get("force", False),
run_on=node,
@ -8507,7 +8321,7 @@ class API_Provisioner_Create_Root(Resource):
start_vm = False
task = run_celery_task(
create_vm,
"provisioner.create",
vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None),
define_vm=define_vm,

View File

@ -48,11 +48,11 @@ import pvcapid.provisioner as provisioner
# Database connections
def open_database(config):
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_name"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur

View File

@ -63,11 +63,11 @@ class ProvisioningError(Exception):
# Database connections
def open_database(config):
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_dbname"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur

View File

@ -1,16 +0,0 @@
# Parallel Virtual Cluster Celery Worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster Celery Worker daemon
After = network-online.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStart = /usr/share/pvc/pvcworkerd.sh
Restart = on-failure
[Install]
WantedBy = multi-user.target

View File

@ -42,7 +42,7 @@ echo " done. Package version ${version}."
# Install the client(s) locally
echo -n "Installing client packages locally..."
$SUDO dpkg -i ../pvc-client*_${version}*.deb &>/dev/null
$SUDO dpkg -i --force-all ../pvc-client*_${version}*.deb &>/dev/null
echo " done".
for HOST in ${HOSTS[@]}; do
@ -59,12 +59,16 @@ fi
for HOST in ${HOSTS[@]}; do
echo "> Deploying packages to host ${HOST}"
echo -n "Installing packages..."
ssh $HOST $SUDO dpkg -i /tmp/pvc/*.deb &>/dev/null
ssh $HOST $SUDO dpkg -i --force-all /tmp/pvc/*.deb &>/dev/null
ssh $HOST rm -rf /tmp/pvc &>/dev/null
echo " done."
echo -n "Restarting PVC daemons..."
ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null
echo " done."
echo -n "Waiting for node daemon to be running..."

View File

@ -13,9 +13,11 @@ echo ${new_ver} >&3
tmpdir=$( mktemp -d )
cp -a debian/changelog client-cli/setup.py ${tmpdir}/
cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py
cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py
cp -a worker-daemon/pvcworkerd/Daemon.py ${tmpdir}/worker-Daemon.py
cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py
# Replace the "base" version with the git revision version
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog
cat <<EOF > debian/changelog
pvc (${new_ver}) unstable; urgency=medium
@ -33,6 +35,8 @@ dpkg-buildpackage -us -uc
cp -a ${tmpdir}/changelog debian/changelog
cp -a ${tmpdir}/setup.py client-cli/setup.py
cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py
cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py
cp -a ${tmpdir}/worker-Daemon.py worker-daemon/pvcworkerd/Daemon.py
cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py
# Clean up

View File

@ -20,6 +20,8 @@ changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )"
rm ${changelog_file}
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvchealthd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcworkerd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py
sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py
echo ${new_version} > .version
@ -46,7 +48,7 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file}
echo -e "${deb_changelog_orig}" >> ${deb_changelog_file}
mv ${deb_changelog_file} debian/changelog
git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git add node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git commit -v
popd &>/dev/null

View File

@ -25,9 +25,6 @@ import psycopg2.extras
from datetime import datetime
from json import loads, dumps
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common
@ -135,11 +132,11 @@ def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
# Database connections
def open_database(config):
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_dbname"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur
@ -152,7 +149,7 @@ def close_database(conn, cur, failed=False):
conn.close()
def list_benchmarks(job=None):
def list_benchmarks(config, job=None):
if job is not None:
query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks")
args = (job,)
@ -278,17 +275,8 @@ def run_benchmark_job(
return jstdout
def run_benchmark(self, pool):
def worker_run_benchmark(zkhandler, celery, config, pool):
# Phase 0 - connect to databases
try:
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
fail(
self,
"Failed to connect to Zookeeper",
)
cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}"
@ -296,7 +284,7 @@ def run_benchmark(self, pool):
current_stage = 0
total_stages = 13
start(
self,
celery,
f"Running storage benchmark '{job_name}' on pool '{pool}'",
current=current_stage,
total=total_stages,
@ -312,13 +300,13 @@ def run_benchmark(self, pool):
zkhandler=zkhandler,
)
fail(
self,
celery,
"Failed to connect to Postgres",
)
current_stage += 1
update(
self,
celery,
"Storing running status in database",
current=current_stage,
total=total_stages,
@ -340,11 +328,11 @@ def run_benchmark(self, pool):
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError)
fail(celery, f"Failed to store running status: {e}", exception=BenchmarkError)
current_stage += 1
update(
self,
celery,
"Creating benchmark volume",
current=current_stage,
total=total_stages,
@ -363,7 +351,7 @@ def run_benchmark(self, pool):
for test in test_matrix:
current_stage += 1
update(
self,
celery,
f"Running benchmark job '{test}'",
current=current_stage,
total=total_stages,
@ -381,7 +369,7 @@ def run_benchmark(self, pool):
# Phase 3 - cleanup
current_stage += 1
update(
self,
celery,
"Cleaning up venchmark volume",
current=current_stage,
total=total_stages,
@ -397,7 +385,7 @@ def run_benchmark(self, pool):
current_stage += 1
update(
self,
celery,
"Storing results in database",
current=current_stage,
total=total_stages,
@ -415,7 +403,7 @@ def run_benchmark(self, pool):
db_cur=db_cur,
zkhandler=zkhandler,
)
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError)
fail(celery, f"Failed to store test results: {e}", exception=BenchmarkError)
cleanup(
job_name,
@ -426,7 +414,7 @@ def run_benchmark(self, pool):
current_stage += 1
return finish(
self,
celery,
f"Storage benchmark {job_name} completed successfully",
current=current_stage,
total=total_stages,

View File

@ -271,17 +271,17 @@ def get_configuration_current(config_file):
"keydb_port": o_database["keydb"]["port"],
"keydb_host": o_database["keydb"]["hostname"],
"keydb_path": o_database["keydb"]["path"],
"metadata_postgresql_port": o_database["postgres"]["port"],
"metadata_postgresql_host": o_database["postgres"]["hostname"],
"metadata_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"api_postgresql_port": o_database["postgres"]["port"],
"api_postgresql_host": o_database["postgres"]["hostname"],
"api_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"database"
],
"metadata_postgresql_user": o_database["postgres"]["credentials"]["api"][
"api_postgresql_user": o_database["postgres"]["credentials"]["api"][
"username"
],
"metadata_postgresql_password": o_database["postgres"]["credentials"][
"api"
]["password"],
"api_postgresql_password": o_database["postgres"]["credentials"]["api"][
"password"
],
"pdns_postgresql_port": o_database["postgres"]["port"],
"pdns_postgresql_host": o_database["postgres"]["hostname"],
"pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][
@ -335,9 +335,7 @@ def get_configuration_current(config_file):
"log_keepalive_cluster_details": o_logging.get(
"log_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_monitoring_details", False
),
"log_monitoring_details": o_logging.get("log_monitoring_details", False),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
@ -362,9 +360,49 @@ def get_configuration_current(config_file):
+ o_ceph["ceph_keyring_file"],
"ceph_monitor_port": o_ceph["monitor_port"],
"ceph_secret_uuid": o_ceph["secret_uuid"],
"storage_hosts": o_ceph.get("monitor_hosts", None),
}
config = {**config, **config_ceph}
o_api = o_config["api"]
o_api_listen = o_api["listen"]
config_api_listen = {
"api_listen_address": o_api_listen["address"],
"api_listen_port": o_api_listen["port"],
}
config = {**config, **config_api_listen}
o_api_authentication = o_api["authentication"]
config_api_authentication = {
"api_auth_enabled": o_api_authentication.get("enabled", False),
"api_auth_secret_key": o_api_authentication.get("secret_key", ""),
"api_auth_source": o_api_authentication.get("source", "token"),
}
config = {**config, **config_api_authentication}
o_api_ssl = o_api["ssl"]
config_api_ssl = {
"api_ssl_enabled": o_api_ssl.get("enabled", False),
"api_ssl_cert_file": o_api_ssl.get("certificate", None),
"api_ssl_key_file": o_api_ssl.get("private_key", None),
}
config = {**config, **config_api_ssl}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
# Set up our token list if specified
if config["api_auth_source"] == "token":
config["api_auth_tokens"] = o_api["token"]
else:
if config["api_auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling API authentication."
)
config["api_auth_enabled"] = False
# Add our node static data to the config
config["static_data"] = get_static_data()

View File

@ -80,7 +80,7 @@ class Logger(object):
self.config["log_directory"] + "/" + self.config["daemon_name"] + ".log"
)
# We open the logfile for the duration of our session, but have a hup function
self.writer = open(self.logfile, "a", buffering=0)
self.writer = open(self.logfile, "a")
self.last_colour = ""
self.last_prompt = ""
@ -93,12 +93,10 @@ class Logger(object):
# Provide a hup function to close and reopen the writer
def hup(self):
self.writer.close()
self.writer = open(self.logfile, "a", buffering=0)
self.writer = open(self.logfile, "a")
# Provide a termination function so all messages are flushed before terminating the main daemon
def terminate(self):
if self.config["file_logging"]:
self.writer.close()
if self.config["zookeeper_logging"]:
self.out("Waiting for Zookeeper message queue to drain", state="s")
@ -112,6 +110,9 @@ class Logger(object):
self.zookeeper_logger.stop()
self.zookeeper_logger.join()
if self.config["file_logging"]:
self.writer.close()
# Output function
def out(self, message, state=None, prefix=""):
# Get the date
@ -152,11 +153,14 @@ class Logger(object):
# Assemble output string
output = colour + prompt + endc + date + prefix + message
self.writer.write(output + "\n")
self.writer.flush()
# Log to Zookeeper
if self.config["zookeeper_logging"]:
# Set the daemon value (only used here as others do not overlap with different daemons)
daemon = f"{self.config['daemon_name']}: "
# Expand to match all daemon names (pvcnoded, pvcworkerd, pvchealthd)
daemon = "{daemon: <12}".format(daemon=daemon)
# Assemble output string
output = daemon + colour + prompt + endc + date + prefix + message
self.zookeeper_queue.put(output)

View File

@ -31,8 +31,6 @@ import uuid
from contextlib import contextmanager
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
@ -167,11 +165,11 @@ def chroot(destination):
def open_db(config):
try:
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_name"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception:
@ -216,8 +214,14 @@ def open_zk(config):
#
# Main VM provisioning function - executed by the Celery worker
#
def create_vm(
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
def worker_create_vm(
celery,
config,
vm_name,
vm_profile,
define_vm=True,
start_vm=True,
script_run_args=[],
):
current_stage = 0
total_stages = 11
@ -326,9 +330,9 @@ def create_vm(
vm_data["system_architecture"] = stdout.strip()
monitor_list = list()
coordinator_names = config["storage_hosts"]
for coordinator in coordinator_names:
monitor_list.append("{}.{}".format(coordinator, config["storage_domain"]))
monitor_names = config["storage_hosts"]
for monitor in monitor_names:
monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
vm_data["ceph_monitor_list"] = monitor_list
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]

22
debian/control vendored
View File

@ -4,19 +4,35 @@ Priority: optional
Maintainer: Joshua Boniface <joshua@boniface.me>
Standards-Version: 3.9.8
Homepage: https://www.boniface.me
X-Python3-Version: >= 3.2
X-Python3-Version: >= 3.7
Package: pvc-daemon-node
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Depends: systemd, pvc-daemon-common, pvc-daemon-health, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Description: Parallel Virtual Cluster node daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC node daemon
Package: pvc-daemon-health
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
Description: Parallel Virtual Cluster health daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC health monitoring daemon
Package: pvc-daemon-worker
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio
Description: Parallel Virtual Cluster worker daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC Celery task worker daemon
Package: pvc-daemon-api
Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.

View File

@ -3,7 +3,5 @@ api-daemon/pvcapid-manage*.py usr/share/pvc
api-daemon/pvc-api-db-upgrade usr/share/pvc
api-daemon/pvcapid usr/share/pvc
api-daemon/pvcapid.service lib/systemd/system
api-daemon/pvcworkerd.service lib/systemd/system
api-daemon/pvcworkerd.sh usr/share/pvc
api-daemon/provisioner usr/share/pvc
api-daemon/migrations usr/share/pvc

5
debian/pvc-daemon-api.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.install vendored Normal file
View File

@ -0,0 +1,4 @@
health-daemon/pvchealthd.py usr/share/pvc
health-daemon/pvchealthd usr/share/pvc
health-daemon/pvchealthd.service lib/systemd/system
health-daemon/plugins usr/share/pvc

14
debian/pvc-daemon-health.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvchealthd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvchealthd.service; then
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

6
debian/pvc-daemon-health.preinst vendored Normal file
View File

@ -0,0 +1,6 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/plugins -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvchealthd.service

View File

@ -4,4 +4,3 @@ node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system
node-daemon/pvcautoready.service lib/systemd/system
node-daemon/monitoring usr/share/pvc
node-daemon/plugins usr/share/pvc

View File

@ -2,4 +2,4 @@
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.install vendored Normal file
View File

@ -0,0 +1,4 @@
worker-daemon/pvcworkerd.sh usr/share/pvc
worker-daemon/pvcworkerd.py usr/share/pvc
worker-daemon/pvcworkerd usr/share/pvc
worker-daemon/pvcworkerd.service lib/systemd/system

14
debian/pvc-daemon-worker.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvcworkerd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvcworkerd.service; then
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

5
debian/pvc-daemon-worker.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcworkerd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvcworkerd.service

View File

@ -8,7 +8,7 @@ import os
import sys
import json
os.environ['PVC_CONFIG_FILE'] = "./api-daemon/pvcapid.sample.yaml"
os.environ['PVC_CONFIG_FILE'] = "./pvc.sample.conf"
sys.path.append('api-daemon')

1
health-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -40,7 +40,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
@ -75,7 +75,7 @@ class MonitoringPluginScript(MonitoringPlugin):
# Set the health delta to 0 (no change)
health_delta = 0
# Craft a message that can be used by the clients
message = "Successfully connected to Libvirtd on localhost"
message = "Successfully connected to KeyDB/Redis on localhost"
# Check the Zookeeper connection
try:

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

24
health-daemon/pvchealthd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvchealthd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.Daemon # noqa: F401
pvchealthd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster health daemon unit file
[Unit]
Description = Parallel Virtual Cluster health daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvchealthd.py
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Daemon.py - Health daemon main entrypoing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.util.zookeeper
import pvchealthd.objects.MonitoringInstance as MonitoringInstance
import pvchealthd.objects.NodeInstance as NodeInstance
import daemon_lib.config as cfg
import daemon_lib.log as log
from time import sleep
import os
import signal
# Daemon version
version = "0.9.82"
##########################################################
# Entrypoint
##########################################################
def entrypoint():
monitoring_instance = None
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvchealthd"
config["daemon_version"] = version
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|--------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster health daemon v{0: <20} |".format(version))
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|--------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s")
# Connect to Zookeeper and return our handler and current schema version
zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config)
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, monitoring_instance
logger.out("Terminating pvchealthd and cleaning up", state="s")
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Close the Zookeeper connection
try:
zkhandler.disconnect(persistent=True)
del zkhandler
except Exception:
pass
logger.out("Terminated health daemon", state="s")
logger.terminate()
if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function
def term(signum="", frame=""):
cleanup(failure=False)
# Hangup (logrotate) function
def hup(signum="", frame=""):
if config["file_logging"]:
logger.hup()
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGHUP, hup)
this_node = NodeInstance.NodeInstance(
config["node_hostname"],
zkhandler,
config,
logger,
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Tick loop; does nothing since everything is async
while True:
try:
sleep(1)
except Exception:
break

View File

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# PluginInstance.py - Class implementing a PVC monitoring instance
# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
@ -317,8 +317,17 @@ class MonitoringInstance(object):
)
if successful_plugins < 1:
self.logger.out(
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
state="e",
)
return
self.logger.out(
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
state="s",
)
# Clean up any old plugin data for which a plugin file no longer exists
plugins_data = self.zkhandler.children(
("node.monitoring.data", self.this_node.name)
@ -344,6 +353,7 @@ class MonitoringInstance(object):
def shutdown(self):
self.stop_check_timer()
self.run_cleanups()
return
def start_check_timer(self):
check_interval = self.config["monitoring_interval"]
@ -395,6 +405,11 @@ class MonitoringInstance(object):
active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now()
self.logger.out(
"Starting monitoring healthcheck run",
state="t",
)
total_health = 100
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
@ -406,10 +421,7 @@ class MonitoringInstance(object):
plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name):
if (
self.config["log_keepalives"]
and self.config["log_keepalive_plugin_details"]
):
if self.config["log_monitoring_details"]:
self.logger.out(
result.message + f" [-{result.health_delta}]",
state="t",

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
class NodeInstance(object):
# Initialization function
def __init__(
self,
name,
zkhandler,
config,
logger,
):
# Passed-in variables on creation
self.name = name
self.zkhandler = zkhandler
self.config = config
self.logger = logger
# States
self.daemon_state = "stop"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Node resources
self.health = 100
self.active_domains_count = 0
self.provisioned_domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.daemon", self.name)
)
def watch_node_daemonstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "stop"
if data != self.daemon_state:
self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.router", self.name)
)
def watch_node_routerstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "client"
if data != self.coordinator_state:
self.coordinator_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.domain", self.name)
)
def watch_node_domainstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "unknown"
if data != self.domain_state:
self.domain_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.monitoring.health", self.name)
)
def watch_node_health(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 100
try:
data = int(data)
except ValueError:
pass
if data != self.health:
self.health = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.free", self.name)
)
def watch_node_memfree(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.used", self.name)
)
def watch_node_memused(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.allocated", self.name)
)
def watch_node_memalloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.vcpu.allocated", self.name)
)
def watch_node_vcpualloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.running_domains", self.name)
)
def watch_node_runningdomains(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii").split()
except AttributeError:
data = []
if len(data) != self.active_domains_count:
self.active_domains_count = len(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.count.provisioned_domains", self.name)
)
def watch_node_domainscount(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.provisioned_domains_count:
self.provisioned_domains_count = data

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
# <Filename> - <Description>
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
##############################################################################
from daemon_lib.zkhandler import ZKHandler
import os
import time
def connect(logger, config):
# Create an instance of the handler
zkhandler = ZKHandler(config, logger)
try:
logger.out(
"Connecting to Zookeeper on coordinator nodes {}".format(
config["coordinators"]
),
state="i",
)
# Start connection
zkhandler.connect(persistent=True)
except Exception as e:
logger.out(
"ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e"
)
os._exit(1)
logger.out("Validating Zookeeper schema", state="i")
try:
node_schema_version = int(
zkhandler.read(("node.data.active_schema", config["node_hostname"]))
)
except Exception:
node_schema_version = int(zkhandler.read("base.schema.version"))
zkhandler.write(
[
(
("node.data.active_schema", config["node_hostname"]),
node_schema_version,
)
]
)
# Load in the current node schema version
zkhandler.schema.load(node_schema_version)
# Record the latest intalled schema version
latest_schema_version = zkhandler.schema.find_latest()
logger.out("Latest installed schema is {}".format(latest_schema_version), state="i")
zkhandler.write(
[(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)]
)
# If we are the last node to get a schema update, fire the master update
if latest_schema_version > node_schema_version:
node_latest_schema_version = list()
for node in zkhandler.children("base.node"):
node_latest_schema_version.append(
int(zkhandler.read(("node.data.latest_schema", node)))
)
# This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load.
if node_latest_schema_version.count(latest_schema_version) == len(
node_latest_schema_version
):
zkhandler.write([("base.schema.version", latest_schema_version)])
return zkhandler, node_schema_version
def validate_schema(logger, zkhandler):
# Validate our schema against the active version
if not zkhandler.schema.validate(zkhandler, logger):
logger.out("Found schema violations, applying", state="i")
zkhandler.schema.apply(zkhandler)
else:
logger.out("Schema successfully validated", state="o")
def setup_node(logger, config, zkhandler):
# Check if our node exists in Zookeeper, and create it if not
if config["daemon_mode"] == "coordinator":
init_routerstate = "secondary"
else:
init_routerstate = "client"
if zkhandler.exists(("node", config["node_hostname"])):
logger.out(
f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i"
)
# Update static data just in case it's changed
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
]
)
else:
logger.out(
f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node",
state="i",
)
keepalive_time = int(time.time())
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.keepalive", config["node_hostname"]), str(keepalive_time)),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.domain", config["node_hostname"]), "flushed"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
(("node.memory.total", config["node_hostname"]), "0"),
(("node.memory.used", config["node_hostname"]), "0"),
(("node.memory.free", config["node_hostname"]), "0"),
(("node.memory.allocated", config["node_hostname"]), "0"),
(("node.memory.provisioned", config["node_hostname"]), "0"),
(("node.vcpu.allocated", config["node_hostname"]), "0"),
(("node.cpu.load", config["node_hostname"]), "0.0"),
(("node.running_domains", config["node_hostname"]), "0"),
(("node.count.provisioned_domains", config["node_hostname"]), "0"),
(("node.count.networks", config["node_hostname"]), "0"),
]
)

View File

@ -20,14 +20,12 @@
###############################################################################
import pvcnoded.util.keepalive
import pvcnoded.util.config
import pvcnoded.util.fencing
import pvcnoded.util.networking
import pvcnoded.util.services
import pvcnoded.util.libvirt
import pvcnoded.util.zookeeper
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance
@ -36,6 +34,7 @@ import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
import pvcnoded.objects.CephInstance as CephInstance
import daemon_lib.config as cfg
import daemon_lib.log as log
import daemon_lib.common as common
@ -59,41 +58,40 @@ version = "0.9.82"
def entrypoint():
keepalive_timer = None
monitoring_instance = None
# Get our configuration
config = pvcnoded.util.config.get_configuration()
config = cfg.get_configuration()
config["daemon_name"] = "pvcnoded"
config["daemon_version"] = version
# Create and validate our directories
pvcnoded.util.config.validate_directories(config)
cfg.validate_directories(config)
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster node daemon v{0: <20} |".format(version))
logger.out("| Debug: {0: <51} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <52} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <52} |".format(config["node_hostname"]))
logger.out("| ID: {0: <54} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <43} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <50} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <50} |".format(config["static_data"][3]))
logger.out("| OS: {0: <52} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <48} |".format(config["static_data"][1]))
logger.out("|------------------------------------------------------------|")
logger.out("|--------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster node daemon v{0: <22} |".format(version))
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|--------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s")
@ -202,7 +200,7 @@ def entrypoint():
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance
nonlocal logger, zkhandler, keepalive_timer, d_domain
logger.out("Terminating pvcnoded and cleaning up", state="s")
@ -251,13 +249,6 @@ def entrypoint():
except Exception:
pass
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
@ -395,8 +386,8 @@ def entrypoint():
node_list = new_node_list
logger.out(
f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}',
state="i",
f'{logger.fmt_cyan}Node list:{logger.fmt_end} {" ".join(node_list)}',
state="s",
)
# Update node objects lists
@ -563,8 +554,8 @@ def entrypoint():
# Update the new list
network_list = new_network_list
logger.out(
f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}',
state="i",
f'{logger.fmt_cyan}Network list:{logger.fmt_end} {" ".join(network_list)}',
state="s",
)
# Update node objects list
@ -894,8 +885,8 @@ def entrypoint():
sriov_vf_list = sorted(new_sriov_vf_list)
logger.out(
f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
state="i",
f'{logger.fmt_cyan}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
state="s",
)
if config["enable_hypervisor"]:
@ -921,8 +912,8 @@ def entrypoint():
# Update the new list
domain_list = new_domain_list
logger.out(
f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
state="i",
f'{logger.fmt_cyan}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
state="s",
)
# Update node objects' list
@ -948,8 +939,8 @@ def entrypoint():
# Update the new list
osd_list = new_osd_list
logger.out(
f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
state="i",
f'{logger.fmt_cyan}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
state="s",
)
# Pool objects
@ -973,8 +964,8 @@ def entrypoint():
# Update the new list
pool_list = new_pool_list
logger.out(
f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
state="i",
f'{logger.fmt_cyan}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
state="s",
)
# Volume objects (in each pool)
@ -1005,15 +996,10 @@ def entrypoint():
# Update the new list
volume_list[pool] = new_volume_list
logger.out(
f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
state="i",
f'{logger.fmt_cyan}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
state="s",
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node

View File

@ -131,11 +131,11 @@ class MetadataAPIInstance(object):
# Helper functions
def open_database(self):
conn = psycopg2.connect(
host=self.config["metadata_postgresql_host"],
port=self.config["metadata_postgresql_port"],
dbname=self.config["metadata_postgresql_dbname"],
user=self.config["metadata_postgresql_user"],
password=self.config["metadata_postgresql_password"],
host=self.config["api_postgresql_host"],
port=self.config["api_postgresql_port"],
dbname=self.config["api_postgresql_dbname"],
user=self.config["api_postgresql_user"],
password=self.config["api_postgresql_password"],
)
cur = conn.cursor(cursor_factory=RealDictCursor)
return conn, cur

View File

@ -700,6 +700,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now()
logger.out(
"Starting node keepalive run",
state="t",
)
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:

View File

@ -290,7 +290,7 @@ logging:
# Enable or disable cluster detail logging during keepalive events
log_cluster_details: yes
# Enable or disable monitoring detail logging during keepalive events
# Enable or disable monitoring detail logging during healthcheck events
log_monitoring_details: yes
# Number of VM console log lines to store in Zookeeper (per VM)

1
worker-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

24
worker-daemon/pvcworkerd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvcworkerd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcworkerd.Daemon # noqa: F401
pvcworkerd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster worker daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvcworkerd.sh
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
# app arguments work in a non-backwards-compatible way with Celery 5.
case "$( cat /etc/debian_version )" in
10.*)
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="worker --app pvcworkerd.Daemon.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
*)
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="--app pvcworkerd.Daemon.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
esac

View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
# Daemon.py - PVC Node Worker daemon
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from celery import Celery
import daemon_lib.config as cfg
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from daemon_lib.benchmark import (
worker_run_benchmark,
)
from daemon_lib.vmbuilder import (
worker_create_vm,
)
version = "0.9.82"
config = cfg.get_configuration()
config["daemon_name"] = "pvcworkerd"
config["daemon_version"] = version
celery_task_uri = "redis://{}:{}{}".format(
config["keydb_host"], config["keydb_port"], config["keydb_path"]
)
celery = Celery(
"pvcworkerd",
broker=celery_task_uri,
result_backend=celery_task_uri,
result_extended=True,
)
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return worker_create_vm(
self,
config,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, run_on="primary"):
@ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool)
return run_storage_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
def entrypoint():
pass

View File