Compare commits

...

19 Commits

Author SHA1 Message Date
102c3c3106 Port all Celery worker functions to discrete pkg
Moves all tasks run by the Celery worker into a discrete package/module
for easier installation. Also adjusts several parameters throughout to
accomplish this.
2023-11-30 02:24:54 -05:00
0c0fb65c62 Rework Flask API to route Celery tasks manually
Avoids needing to define any of these tasks here; they can all be
defined in the pvcworkerd code.
2023-11-30 00:40:09 -05:00
03a738f878 Move config parser into daemon_lib
And reformat/add config values for API.
2023-11-30 00:05:37 -05:00
4df5fdbca6 Update description of example conf 2023-11-29 21:21:51 -05:00
97eb63ebab Clean up config naming and dead files 2023-11-29 21:21:51 -05:00
4a2eba0961 Improve node output messages (from pvchealthd)
1. Output startup "list" entries in cyan with s state
2. Add start of keepalive run message
2023-11-29 21:21:51 -05:00
077dd8708f Add check start message 2023-11-29 21:21:51 -05:00
b6b5786c3b Output list in cyan (s state) 2023-11-29 21:21:51 -05:00
ad738dec40 Clean up plugin pycache too 2023-11-29 21:21:51 -05:00
d2b764a2c7 Output more details on startup 2023-11-29 21:21:51 -05:00
b8aecd9c83 Wait less time between restarts 2023-11-29 21:21:51 -05:00
11db3c5b20 Fix ordering during termination 2023-11-29 21:21:51 -05:00
7a7c975eff Ensure return from health shutdown 2023-11-29 21:21:51 -05:00
fa12a3c9b1 Permit buffered log appending 2023-11-29 21:21:51 -05:00
787f4216b3 Expand Zookeeper log daemon prefix to match 2023-11-29 21:21:51 -05:00
647cba3cf5 Expand startup width for new daemon name 2023-11-29 21:21:51 -05:00
921ecb3a05 Fix name in kydb plugin 2023-11-29 21:21:51 -05:00
6a68cf665b Wait between service restarts 2023-11-29 21:21:51 -05:00
41f4e4fb2f Split health monitoring into discrete daemon/pkg 2023-11-29 21:21:51 -05:00
58 changed files with 1227 additions and 572 deletions

View File

@ -19,13 +19,13 @@
# #
############################################################################### ###############################################################################
import os
import yaml
from ssl import SSLContext, TLSVersion from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg
# Daemon version # Daemon version
version = "0.9.82" version = "0.9.82"
@ -53,160 +53,13 @@ def strtobool(stringv):
# Configuration Parsing # Configuration Parsing
########################################################## ##########################################################
# Parse the configuration file
config_file = None
try:
_config_file = "/etc/pvc/pvcapid.yaml"
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "legacy"
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "current"
except Exception:
pass
if not config_file: # Get our configuration
print( config = cfg.get_configuration()
'Error: The "PVC_CONFIG_FILE" environment variable must be set before starting pvcapid.' config["daemon_name"] = "pvcapid"
) config["daemon_version"] = version
exit(1)
def load_configuration_file(config_file):
print('Loading configuration from file "{}"'.format(config_file))
# Read in the config
try:
with open(config_file, "r") as cfgfile:
o_config = yaml.load(cfgfile, Loader=yaml.BaseLoader)
except Exception as e:
print("ERROR: Failed to parse configuration file: {}".format(e))
exit(1)
return o_config
def get_configuration_current(config_file):
o_config = load_configuration_file(config_file)
try:
# Create the config object
config = {
"debug": strtobool(o_config["logging"].get("debug_logging", "False")),
"all_nodes": o_config["cluster"]["all_nodes"],
"coordinators": o_config["cluster"]["coordinator_nodes"],
"listen_address": o_config["api"]["listen"]["address"],
"listen_port": int(o_config["api"]["listen"]["port"]),
"auth_enabled": strtobool(
o_config["api"]["authentication"].get("enabled", "False")
),
"auth_secret_key": o_config["api"]["authentication"]["secret_key"],
"auth_source": o_config["api"]["authentication"]["source"],
"ssl_enabled": strtobool(o_config["api"]["ssl"].get("enabled", "False")),
"ssl_cert_file": o_config["api"]["ssl"]["certificate"],
"ssl_key_file": o_config["api"]["ssl"]["private_key"],
"database_port": o_config["database"]["postgres"]["port"],
"database_host": o_config["database"]["postgres"]["hostname"],
"database_name": o_config["database"]["postgres"]["credentials"]["api"][
"database"
],
"database_user": o_config["database"]["postgres"]["credentials"]["api"][
"username"
],
"database_password": o_config["database"]["postgres"]["credentials"]["api"][
"password"
],
"queue_port": o_config["database"]["keydb"]["port"],
"queue_host": o_config["database"]["keydb"]["hostname"],
"queue_path": o_config["database"]["keydb"]["path"],
"storage_domain": o_config["cluster"]["networks"]["storage"]["domain"],
"storage_hosts": o_config["ceph"].get("monitor_hosts", None),
"ceph_monitor_port": o_config["ceph"]["monitor_port"],
"ceph_storage_secret_uuid": o_config["ceph"]["secret_uuid"],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
# Set up our token list if specified
if config["auth_source"] == "token":
config["auth_tokens"] = o_config["api"]["token"]
else:
if config["auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling authentication."
)
config["auth_enabled"] = False
except Exception as e:
print(f"ERROR: Failed to load configuration: {e}")
exit(1)
return config
def get_configuration_legacy(config_file):
o_config = load_configuration_file(config_file)
try:
# Create the config object
config = {
"debug": strtobool(o_config["pvc"]["debug"]),
"coordinators": o_config["pvc"]["coordinators"],
"listen_address": o_config["pvc"]["api"]["listen_address"],
"listen_port": int(o_config["pvc"]["api"]["listen_port"]),
"auth_enabled": strtobool(
o_config["pvc"]["api"]["authentication"]["enabled"]
),
"auth_secret_key": o_config["pvc"]["api"]["authentication"]["secret_key"],
"auth_tokens": o_config["pvc"]["api"]["authentication"]["tokens"],
"ssl_enabled": strtobool(o_config["pvc"]["api"]["ssl"]["enabled"]),
"ssl_key_file": o_config["pvc"]["api"]["ssl"]["key_file"],
"ssl_cert_file": o_config["pvc"]["api"]["ssl"]["cert_file"],
"database_host": o_config["pvc"]["provisioner"]["database"]["host"],
"database_port": int(o_config["pvc"]["provisioner"]["database"]["port"]),
"database_name": o_config["pvc"]["provisioner"]["database"]["name"],
"database_user": o_config["pvc"]["provisioner"]["database"]["user"],
"database_password": o_config["pvc"]["provisioner"]["database"]["pass"],
"queue_host": o_config["pvc"]["provisioner"]["queue"]["host"],
"queue_port": o_config["pvc"]["provisioner"]["queue"]["port"],
"queue_path": o_config["pvc"]["provisioner"]["queue"]["path"],
"storage_hosts": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_hosts"
],
"storage_domain": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_domain"
],
"ceph_monitor_port": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_monitor_port"
],
"ceph_storage_secret_uuid": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_storage_secret_uuid"
],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"]:
config["storage_hosts"] = config["coordinators"]
except Exception as e:
print("ERROR: Failed to load configuration: {}".format(e))
exit(1)
return config
if config_type == "legacy":
config = get_configuration_legacy(config_file)
else:
config = get_configuration_current(config_file)
########################################################## ##########################################################
# Entrypoint # Entrypoint
########################################################## ##########################################################
@ -215,41 +68,43 @@ else:
def entrypoint(): def entrypoint():
import pvcapid.flaskapi as pvc_api # noqa: E402 import pvcapid.flaskapi as pvc_api # noqa: E402
if config["ssl_enabled"]: if config["api_ssl_enabled"]:
context = SSLContext() context = SSLContext()
context.minimum_version = TLSVersion.TLSv1 context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs() context.get_ca_certs()
context.load_cert_chain(config["ssl_cert_file"], keyfile=config["ssl_key_file"]) context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else: else:
context = None context = None
# Print our startup messages # Print our startup messages
print("") print("")
print("|------------------------------------------------------------|") print("|--------------------------------------------------------------|")
print("| |") print("| |")
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |") print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
print("| ██ ▜█▙ ▟█▛ ██ |") print("| ██ ▜█▙ ▟█▛ ██ |")
print("| ███████████ ▜█▙ ▟█▛ ██ |") print("| ███████████ ▜█▙ ▟█▛ ██ |")
print("| ██ ▜█▙▟█▛ ███████████ |") print("| ██ ▜█▙▟█▛ ███████████ |")
print("| |") print("| |")
print("|------------------------------------------------------------|") print("|--------------------------------------------------------------|")
print("| Parallel Virtual Cluster API daemon v{0: <21} |".format(version)) print("| Parallel Virtual Cluster API daemon v{0: <23} |".format(version))
print("| Debug: {0: <51} |".format(str(config["debug"]))) print("| Debug: {0: <53} |".format(str(config["debug"])))
print("| API version: v{0: <44} |".format(API_VERSION)) print("| API version: v{0: <46} |".format(API_VERSION))
print( print(
"| Listen: {0: <50} |".format( "| Listen: {0: <52} |".format(
"{}:{}".format(config["listen_address"], config["listen_port"]) "{}:{}".format(config["api_listen_address"], config["api_listen_port"])
) )
) )
print("| SSL: {0: <53} |".format(str(config["ssl_enabled"]))) print("| SSL: {0: <55} |".format(str(config["api_ssl_enabled"])))
print("| Authentication: {0: <42} |".format(str(config["auth_enabled"]))) print("| Authentication: {0: <44} |".format(str(config["api_auth_enabled"])))
print("|------------------------------------------------------------|") print("|--------------------------------------------------------------|")
print("") print("")
pvc_api.celery_startup() pvc_api.celery_startup()
pvc_api.app.run( pvc_api.app.run(
config["listen_address"], config["api_listen_address"],
config["listen_port"], config["api_listen_port"],
threaded=True, threaded=True,
ssl_context=context, ssl_context=context,
) )

View File

@ -31,25 +31,12 @@ from uuid import uuid4
from daemon_lib.common import getPrimaryNode from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection from daemon_lib.zkhandler import ZKConnection
from daemon_lib.node import get_list as get_node_list from daemon_lib.node import get_list as get_node_list
from daemon_lib.vm import ( from daemon_lib.benchmark import list_benchmarks
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from pvcapid.Daemon import config, strtobool, API_VERSION from pvcapid.Daemon import config, strtobool, API_VERSION
import pvcapid.helper as api_helper import pvcapid.helper as api_helper
import pvcapid.provisioner as api_provisioner import pvcapid.provisioner as api_provisioner
import pvcapid.vmbuilder as api_vmbuilder
import pvcapid.benchmark as api_benchmark
import pvcapid.ova as api_ova import pvcapid.ova as api_ova
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
@ -61,11 +48,11 @@ app = flask.Flask(__name__)
# Set up SQLAlchemy backend # Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format( app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
config["database_user"], config["api_postgresql_user"],
config["database_password"], config["api_postgresql_password"],
config["database_host"], config["api_postgresql_host"],
config["database_port"], config["api_postgresql_port"],
config["database_name"], config["api_postgresql_dbname"],
) )
if config["debug"]: if config["debug"]:
@ -73,8 +60,8 @@ if config["debug"]:
else: else:
app.config["DEBUG"] = False app.config["DEBUG"] = False
if config["auth_enabled"]: if config["api_auth_enabled"]:
app.config["SECRET_KEY"] = config["auth_secret_key"] app.config["SECRET_KEY"] = config["api_auth_secret_key"]
# Create SQLAlchemy database # Create SQLAlchemy database
db = SQLAlchemy(app) db = SQLAlchemy(app)
@ -99,41 +86,34 @@ def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler) return getPrimaryNode(zkhandler)
# Set up Celery queue routing # Set up Celery task ID generator
def route_task(name, args, kwargs, options, task=None, **kw): # 1. Lets us make our own IDs (first section of UUID)
print("----") # 2. Lets us distribute jobs to the required pvcworkerd instances
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") def run_celery_task(task_name, **kwargs):
task_id = str(uuid4()).split("-")[0]
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue if "run_on" in kwargs and kwargs["run_on"] != "primary":
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): run_on = kwargs["run_on"]
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
else: else:
run_on = get_primary_node() run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}") print(
print("----") f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}"
return run_on
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
) )
task = celery.send_task(
task_name,
task_id=task_id,
kwargs=kwargs,
queue=run_on,
)
return task return task
# Create celery definition # Create celery definition
celery_task_uri = "redis://{}:{}{}".format( celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"] config["keydb_host"], config["keydb_port"], config["keydb_path"]
) )
celery = Celery( celery = Celery(
app.name, app.name,
@ -153,7 +133,6 @@ def celery_startup():
app.config["task_queues"] = tuple( app.config["task_queues"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
) )
app.config["task_routes"] = (route_task,)
celery.conf.update(app.config) celery.conf.update(app.config)
@ -199,7 +178,7 @@ def Authenticator(function):
@wraps(function) @wraps(function)
def authenticate(*args, **kwargs): def authenticate(*args, **kwargs):
# No authentication required # No authentication required
if not config["auth_enabled"]: if not config["api_auth_enabled"]:
return function(*args, **kwargs) return function(*args, **kwargs)
# Session-based authentication # Session-based authentication
if "token" in flask.session: if "token" in flask.session:
@ -208,7 +187,7 @@ def Authenticator(function):
if "X-Api-Key" in flask.request.headers: if "X-Api-Key" in flask.request.headers:
if any( if any(
token token
for token in config["auth_tokens"] for token in config["api_auth_tokens"]
if flask.request.headers.get("X-Api-Key") == token.get("token") if flask.request.headers.get("X-Api-Key") == token.get("token")
): ):
return function(*args, **kwargs) return function(*args, **kwargs)
@ -220,171 +199,6 @@ def Authenticator(function):
return authenticate return authenticate
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool=None, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
########################################################## ##########################################################
# API Root/Authentication # API Root/Authentication
########################################################## ##########################################################
@ -469,12 +283,12 @@ class API_Login(Resource):
type: object type: object
id: Message id: Message
""" """
if not config["auth_enabled"]: if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root)) return flask.redirect(Api.url_for(api, API_Root))
if any( if any(
token token
for token in config["auth_tokens"] for token in config["api_auth_tokens"]
if flask.request.values["token"] in token["token"] if flask.request.values["token"] in token["token"]
): ):
flask.session["token"] = flask.request.form["token"] flask.session["token"] = flask.request.form["token"]
@ -503,7 +317,7 @@ class API_Logout(Resource):
302: 302:
description: Authentication disabled description: Authentication disabled
""" """
if not config["auth_enabled"]: if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root)) return flask.redirect(Api.url_for(api, API_Root))
flask.session.pop("token", None) flask.session.pop("token", None)
@ -2459,7 +2273,7 @@ class API_VM_Locks(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node) task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node)
return ( return (
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node}, {"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
@ -2594,7 +2408,7 @@ class API_VM_Device(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node) task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node)
return ( return (
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node}, {"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
@ -2648,7 +2462,7 @@ class API_VM_Device(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node) task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node)
return ( return (
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node}, {"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
@ -4364,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string (integer) type: string (integer)
description: The number of minor page faults during the test description: The number of minor page faults during the test
""" """
return api_benchmark.list_benchmarks(reqargs.get("job", None)) return list_benchmarks(config, reqargs.get("job", None))
@RequestParser( @RequestParser(
[ [
@ -4405,7 +4219,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400 }, 400
task = run_celery_task( task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary" "storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
) )
return ( return (
{ {
@ -4531,7 +4345,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
node = reqargs.get("node", None) node = reqargs.get("node", None)
task = run_celery_task( task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node "osd.add_db_vg", device=reqargs.get("device", None), run_on=node
) )
return ( return (
@ -4730,7 +4544,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
node = reqargs.get("node", None) node = reqargs.get("node", None)
task = run_celery_task( task = run_celery_task(
osd_add, "osd.add",
device=reqargs.get("device", None), device=reqargs.get("device", None),
weight=reqargs.get("weight", None), weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None), ext_db_ratio=reqargs.get("ext_db_ratio", None),
@ -4849,7 +4663,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode return osd_node_detail, retcode
task = run_celery_task( task = run_celery_task(
osd_replace, "osd.replace",
osd_id=osdid, osd_id=osdid,
new_device=reqargs.get("new_device"), new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None), old_device=reqargs.get("old_device", None),
@ -4907,7 +4721,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode return osd_node_detail, retcode
task = run_celery_task( task = run_celery_task(
osd_refresh, "osd.refresh",
osd_id=osdid, osd_id=osdid,
device=reqargs.get("device", None), device=reqargs.get("device", None),
ext_db_flag=False, ext_db_flag=False,
@ -4978,7 +4792,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode return osd_node_detail, retcode
task = run_celery_task( task = run_celery_task(
osd_remove, "osd.remove",
osd_id=osdid, osd_id=osdid,
force_flag=reqargs.get("force", False), force_flag=reqargs.get("force", False),
run_on=node, run_on=node,
@ -8507,7 +8321,7 @@ class API_Provisioner_Create_Root(Resource):
start_vm = False start_vm = False
task = run_celery_task( task = run_celery_task(
create_vm, "provisioner.create",
vm_name=reqargs.get("name", None), vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None), profile_name=reqargs.get("profile", None),
define_vm=define_vm, define_vm=define_vm,

View File

@ -48,11 +48,11 @@ import pvcapid.provisioner as provisioner
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_name"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur return conn, cur

View File

@ -63,11 +63,11 @@ class ProvisioningError(Exception):
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_dbname"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur return conn, cur

View File

@ -1,16 +0,0 @@
# Parallel Virtual Cluster Celery Worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster Celery Worker daemon
After = network-online.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStart = /usr/share/pvc/pvcworkerd.sh
Restart = on-failure
[Install]
WantedBy = multi-user.target

View File

@ -42,7 +42,7 @@ echo " done. Package version ${version}."
# Install the client(s) locally # Install the client(s) locally
echo -n "Installing client packages locally..." echo -n "Installing client packages locally..."
$SUDO dpkg -i ../pvc-client*_${version}*.deb &>/dev/null $SUDO dpkg -i --force-all ../pvc-client*_${version}*.deb &>/dev/null
echo " done". echo " done".
for HOST in ${HOSTS[@]}; do for HOST in ${HOSTS[@]}; do
@ -59,12 +59,16 @@ fi
for HOST in ${HOSTS[@]}; do for HOST in ${HOSTS[@]}; do
echo "> Deploying packages to host ${HOST}" echo "> Deploying packages to host ${HOST}"
echo -n "Installing packages..." echo -n "Installing packages..."
ssh $HOST $SUDO dpkg -i /tmp/pvc/*.deb &>/dev/null ssh $HOST $SUDO dpkg -i --force-all /tmp/pvc/*.deb &>/dev/null
ssh $HOST rm -rf /tmp/pvc &>/dev/null ssh $HOST rm -rf /tmp/pvc &>/dev/null
echo " done." echo " done."
echo -n "Restarting PVC daemons..." echo -n "Restarting PVC daemons..."
ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null
echo " done." echo " done."
echo -n "Waiting for node daemon to be running..." echo -n "Waiting for node daemon to be running..."

View File

@ -13,9 +13,11 @@ echo ${new_ver} >&3
tmpdir=$( mktemp -d ) tmpdir=$( mktemp -d )
cp -a debian/changelog client-cli/setup.py ${tmpdir}/ cp -a debian/changelog client-cli/setup.py ${tmpdir}/
cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py
cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py
cp -a worker-daemon/pvcworkerd/Daemon.py ${tmpdir}/worker-Daemon.py
cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py
# Replace the "base" version with the git revision version # Replace the "base" version with the git revision version
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog
cat <<EOF > debian/changelog cat <<EOF > debian/changelog
pvc (${new_ver}) unstable; urgency=medium pvc (${new_ver}) unstable; urgency=medium
@ -33,6 +35,8 @@ dpkg-buildpackage -us -uc
cp -a ${tmpdir}/changelog debian/changelog cp -a ${tmpdir}/changelog debian/changelog
cp -a ${tmpdir}/setup.py client-cli/setup.py cp -a ${tmpdir}/setup.py client-cli/setup.py
cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py
cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py
cp -a ${tmpdir}/worker-Daemon.py worker-daemon/pvcworkerd/Daemon.py
cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py
# Clean up # Clean up

View File

@ -20,6 +20,8 @@ changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )"
rm ${changelog_file} rm ${changelog_file}
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvchealthd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcworkerd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py
sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py
echo ${new_version} > .version echo ${new_version} > .version
@ -46,7 +48,7 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file}
echo -e "${deb_changelog_orig}" >> ${deb_changelog_file} echo -e "${deb_changelog_orig}" >> ${deb_changelog_file}
mv ${deb_changelog_file} debian/changelog mv ${deb_changelog_file} debian/changelog
git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version git add node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git commit -v git commit -v
popd &>/dev/null popd &>/dev/null

View File

@ -25,9 +25,6 @@ import psycopg2.extras
from datetime import datetime from datetime import datetime
from json import loads, dumps from json import loads, dumps
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, update, finish from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common import daemon_lib.common as pvc_common
@ -135,11 +132,11 @@ def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_dbname"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur return conn, cur
@ -152,7 +149,7 @@ def close_database(conn, cur, failed=False):
conn.close() conn.close()
def list_benchmarks(job=None): def list_benchmarks(config, job=None):
if job is not None: if job is not None:
query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks") query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks")
args = (job,) args = (job,)
@ -278,17 +275,8 @@ def run_benchmark_job(
return jstdout return jstdout
def run_benchmark(self, pool): def worker_run_benchmark(zkhandler, celery, config, pool):
# Phase 0 - connect to databases # Phase 0 - connect to databases
try:
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
fail(
self,
"Failed to connect to Zookeeper",
)
cur_time = datetime.now().isoformat(timespec="seconds") cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node") cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}" job_name = f"{cur_time}_{cur_primary}"
@ -296,7 +284,7 @@ def run_benchmark(self, pool):
current_stage = 0 current_stage = 0
total_stages = 13 total_stages = 13
start( start(
self, celery,
f"Running storage benchmark '{job_name}' on pool '{pool}'", f"Running storage benchmark '{job_name}' on pool '{pool}'",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -312,13 +300,13 @@ def run_benchmark(self, pool):
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail( fail(
self, celery,
"Failed to connect to Postgres", "Failed to connect to Postgres",
) )
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Storing running status in database", "Storing running status in database",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -340,11 +328,11 @@ def run_benchmark(self, pool):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError) fail(celery, f"Failed to store running status: {e}", exception=BenchmarkError)
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Creating benchmark volume", "Creating benchmark volume",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -363,7 +351,7 @@ def run_benchmark(self, pool):
for test in test_matrix: for test in test_matrix:
current_stage += 1 current_stage += 1
update( update(
self, celery,
f"Running benchmark job '{test}'", f"Running benchmark job '{test}'",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -381,7 +369,7 @@ def run_benchmark(self, pool):
# Phase 3 - cleanup # Phase 3 - cleanup
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Cleaning up venchmark volume", "Cleaning up venchmark volume",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -397,7 +385,7 @@ def run_benchmark(self, pool):
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Storing results in database", "Storing results in database",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -415,7 +403,7 @@ def run_benchmark(self, pool):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError) fail(celery, f"Failed to store test results: {e}", exception=BenchmarkError)
cleanup( cleanup(
job_name, job_name,
@ -426,7 +414,7 @@ def run_benchmark(self, pool):
current_stage += 1 current_stage += 1
return finish( return finish(
self, celery,
f"Storage benchmark {job_name} completed successfully", f"Storage benchmark {job_name} completed successfully",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,

View File

@ -271,17 +271,17 @@ def get_configuration_current(config_file):
"keydb_port": o_database["keydb"]["port"], "keydb_port": o_database["keydb"]["port"],
"keydb_host": o_database["keydb"]["hostname"], "keydb_host": o_database["keydb"]["hostname"],
"keydb_path": o_database["keydb"]["path"], "keydb_path": o_database["keydb"]["path"],
"metadata_postgresql_port": o_database["postgres"]["port"], "api_postgresql_port": o_database["postgres"]["port"],
"metadata_postgresql_host": o_database["postgres"]["hostname"], "api_postgresql_host": o_database["postgres"]["hostname"],
"metadata_postgresql_dbname": o_database["postgres"]["credentials"]["api"][ "api_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"database" "database"
], ],
"metadata_postgresql_user": o_database["postgres"]["credentials"]["api"][ "api_postgresql_user": o_database["postgres"]["credentials"]["api"][
"username" "username"
], ],
"metadata_postgresql_password": o_database["postgres"]["credentials"][ "api_postgresql_password": o_database["postgres"]["credentials"]["api"][
"api" "password"
]["password"], ],
"pdns_postgresql_port": o_database["postgres"]["port"], "pdns_postgresql_port": o_database["postgres"]["port"],
"pdns_postgresql_host": o_database["postgres"]["hostname"], "pdns_postgresql_host": o_database["postgres"]["hostname"],
"pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][ "pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][
@ -335,9 +335,7 @@ def get_configuration_current(config_file):
"log_keepalive_cluster_details": o_logging.get( "log_keepalive_cluster_details": o_logging.get(
"log_cluster_details", False "log_cluster_details", False
), ),
"log_keepalive_plugin_details": o_logging.get( "log_monitoring_details": o_logging.get("log_monitoring_details", False),
"log_monitoring_details", False
),
"console_log_lines": o_logging.get("console_log_lines", False), "console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False), "node_log_lines": o_logging.get("node_log_lines", False),
} }
@ -362,9 +360,49 @@ def get_configuration_current(config_file):
+ o_ceph["ceph_keyring_file"], + o_ceph["ceph_keyring_file"],
"ceph_monitor_port": o_ceph["monitor_port"], "ceph_monitor_port": o_ceph["monitor_port"],
"ceph_secret_uuid": o_ceph["secret_uuid"], "ceph_secret_uuid": o_ceph["secret_uuid"],
"storage_hosts": o_ceph.get("monitor_hosts", None),
} }
config = {**config, **config_ceph} config = {**config, **config_ceph}
o_api = o_config["api"]
o_api_listen = o_api["listen"]
config_api_listen = {
"api_listen_address": o_api_listen["address"],
"api_listen_port": o_api_listen["port"],
}
config = {**config, **config_api_listen}
o_api_authentication = o_api["authentication"]
config_api_authentication = {
"api_auth_enabled": o_api_authentication.get("enabled", False),
"api_auth_secret_key": o_api_authentication.get("secret_key", ""),
"api_auth_source": o_api_authentication.get("source", "token"),
}
config = {**config, **config_api_authentication}
o_api_ssl = o_api["ssl"]
config_api_ssl = {
"api_ssl_enabled": o_api_ssl.get("enabled", False),
"api_ssl_cert_file": o_api_ssl.get("certificate", None),
"api_ssl_key_file": o_api_ssl.get("private_key", None),
}
config = {**config, **config_api_ssl}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
# Set up our token list if specified
if config["api_auth_source"] == "token":
config["api_auth_tokens"] = o_api["token"]
else:
if config["api_auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling API authentication."
)
config["api_auth_enabled"] = False
# Add our node static data to the config # Add our node static data to the config
config["static_data"] = get_static_data() config["static_data"] = get_static_data()

View File

@ -80,7 +80,7 @@ class Logger(object):
self.config["log_directory"] + "/" + self.config["daemon_name"] + ".log" self.config["log_directory"] + "/" + self.config["daemon_name"] + ".log"
) )
# We open the logfile for the duration of our session, but have a hup function # We open the logfile for the duration of our session, but have a hup function
self.writer = open(self.logfile, "a", buffering=0) self.writer = open(self.logfile, "a")
self.last_colour = "" self.last_colour = ""
self.last_prompt = "" self.last_prompt = ""
@ -93,12 +93,10 @@ class Logger(object):
# Provide a hup function to close and reopen the writer # Provide a hup function to close and reopen the writer
def hup(self): def hup(self):
self.writer.close() self.writer.close()
self.writer = open(self.logfile, "a", buffering=0) self.writer = open(self.logfile, "a")
# Provide a termination function so all messages are flushed before terminating the main daemon # Provide a termination function so all messages are flushed before terminating the main daemon
def terminate(self): def terminate(self):
if self.config["file_logging"]:
self.writer.close()
if self.config["zookeeper_logging"]: if self.config["zookeeper_logging"]:
self.out("Waiting for Zookeeper message queue to drain", state="s") self.out("Waiting for Zookeeper message queue to drain", state="s")
@ -112,6 +110,9 @@ class Logger(object):
self.zookeeper_logger.stop() self.zookeeper_logger.stop()
self.zookeeper_logger.join() self.zookeeper_logger.join()
if self.config["file_logging"]:
self.writer.close()
# Output function # Output function
def out(self, message, state=None, prefix=""): def out(self, message, state=None, prefix=""):
# Get the date # Get the date
@ -152,11 +153,14 @@ class Logger(object):
# Assemble output string # Assemble output string
output = colour + prompt + endc + date + prefix + message output = colour + prompt + endc + date + prefix + message
self.writer.write(output + "\n") self.writer.write(output + "\n")
self.writer.flush()
# Log to Zookeeper # Log to Zookeeper
if self.config["zookeeper_logging"]: if self.config["zookeeper_logging"]:
# Set the daemon value (only used here as others do not overlap with different daemons) # Set the daemon value (only used here as others do not overlap with different daemons)
daemon = f"{self.config['daemon_name']}: " daemon = f"{self.config['daemon_name']}: "
# Expand to match all daemon names (pvcnoded, pvcworkerd, pvchealthd)
daemon = "{daemon: <12}".format(daemon=daemon)
# Assemble output string # Assemble output string
output = daemon + colour + prompt + endc + date + prefix + message output = daemon + colour + prompt + endc + date + prefix + message
self.zookeeper_queue.put(output) self.zookeeper_queue.put(output)

View File

@ -31,8 +31,6 @@ import uuid
from contextlib import contextmanager from contextlib import contextmanager
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
@ -167,11 +165,11 @@ def chroot(destination):
def open_db(config): def open_db(config):
try: try:
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_name"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception: except Exception:
@ -216,8 +214,14 @@ def open_zk(config):
# #
# Main VM provisioning function - executed by the Celery worker # Main VM provisioning function - executed by the Celery worker
# #
def create_vm( def worker_create_vm(
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[] celery,
config,
vm_name,
vm_profile,
define_vm=True,
start_vm=True,
script_run_args=[],
): ):
current_stage = 0 current_stage = 0
total_stages = 11 total_stages = 11
@ -326,9 +330,9 @@ def create_vm(
vm_data["system_architecture"] = stdout.strip() vm_data["system_architecture"] = stdout.strip()
monitor_list = list() monitor_list = list()
coordinator_names = config["storage_hosts"] monitor_names = config["storage_hosts"]
for coordinator in coordinator_names: for monitor in monitor_names:
monitor_list.append("{}.{}".format(coordinator, config["storage_domain"])) monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
vm_data["ceph_monitor_list"] = monitor_list vm_data["ceph_monitor_list"] = monitor_list
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"] vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"] vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]

22
debian/control vendored
View File

@ -4,19 +4,35 @@ Priority: optional
Maintainer: Joshua Boniface <joshua@boniface.me> Maintainer: Joshua Boniface <joshua@boniface.me>
Standards-Version: 3.9.8 Standards-Version: 3.9.8
Homepage: https://www.boniface.me Homepage: https://www.boniface.me
X-Python3-Version: >= 3.2 X-Python3-Version: >= 3.7
Package: pvc-daemon-node Package: pvc-daemon-node
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql Depends: systemd, pvc-daemon-common, pvc-daemon-health, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Description: Parallel Virtual Cluster node daemon Description: Parallel Virtual Cluster node daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .
This package installs the PVC node daemon This package installs the PVC node daemon
Package: pvc-daemon-health
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
Description: Parallel Virtual Cluster health daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC health monitoring daemon
Package: pvc-daemon-worker
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio
Description: Parallel Virtual Cluster worker daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC Celery task worker daemon
Package: pvc-daemon-api Package: pvc-daemon-api
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .

View File

@ -3,7 +3,5 @@ api-daemon/pvcapid-manage*.py usr/share/pvc
api-daemon/pvc-api-db-upgrade usr/share/pvc api-daemon/pvc-api-db-upgrade usr/share/pvc
api-daemon/pvcapid usr/share/pvc api-daemon/pvcapid usr/share/pvc
api-daemon/pvcapid.service lib/systemd/system api-daemon/pvcapid.service lib/systemd/system
api-daemon/pvcworkerd.service lib/systemd/system
api-daemon/pvcworkerd.sh usr/share/pvc
api-daemon/provisioner usr/share/pvc api-daemon/provisioner usr/share/pvc
api-daemon/migrations usr/share/pvc api-daemon/migrations usr/share/pvc

5
debian/pvc-daemon-api.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.install vendored Normal file
View File

@ -0,0 +1,4 @@
health-daemon/pvchealthd.py usr/share/pvc
health-daemon/pvchealthd usr/share/pvc
health-daemon/pvchealthd.service lib/systemd/system
health-daemon/plugins usr/share/pvc

14
debian/pvc-daemon-health.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvchealthd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvchealthd.service; then
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

6
debian/pvc-daemon-health.preinst vendored Normal file
View File

@ -0,0 +1,6 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/plugins -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvchealthd.service

View File

@ -4,4 +4,3 @@ node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system node-daemon/pvc.target lib/systemd/system
node-daemon/pvcautoready.service lib/systemd/system node-daemon/pvcautoready.service lib/systemd/system
node-daemon/monitoring usr/share/pvc node-daemon/monitoring usr/share/pvc
node-daemon/plugins usr/share/pvc

View File

@ -2,4 +2,4 @@
# Remove any cached CPython directories or files # Remove any cached CPython directories or files
echo "Cleaning up existing CPython files" echo "Cleaning up existing CPython files"
find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.install vendored Normal file
View File

@ -0,0 +1,4 @@
worker-daemon/pvcworkerd.sh usr/share/pvc
worker-daemon/pvcworkerd.py usr/share/pvc
worker-daemon/pvcworkerd usr/share/pvc
worker-daemon/pvcworkerd.service lib/systemd/system

14
debian/pvc-daemon-worker.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvcworkerd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvcworkerd.service; then
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

5
debian/pvc-daemon-worker.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcworkerd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvcworkerd.service

View File

@ -8,7 +8,7 @@ import os
import sys import sys
import json import json
os.environ['PVC_CONFIG_FILE'] = "./api-daemon/pvcapid.sample.yaml" os.environ['PVC_CONFIG_FILE'] = "./pvc.sample.conf"
sys.path.append('api-daemon') sys.path.append('api-daemon')

1
health-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -40,7 +40,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to
@ -75,7 +75,7 @@ class MonitoringPluginScript(MonitoringPlugin):
# Set the health delta to 0 (no change) # Set the health delta to 0 (no change)
health_delta = 0 health_delta = 0
# Craft a message that can be used by the clients # Craft a message that can be used by the clients
message = "Successfully connected to Libvirtd on localhost" message = "Successfully connected to KeyDB/Redis on localhost"
# Check the Zookeeper connection # Check the Zookeeper connection
try: try:

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

24
health-daemon/pvchealthd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvchealthd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.Daemon # noqa: F401
pvchealthd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster health daemon unit file
[Unit]
Description = Parallel Virtual Cluster health daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvchealthd.py
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Daemon.py - Health daemon main entrypoing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.util.zookeeper
import pvchealthd.objects.MonitoringInstance as MonitoringInstance
import pvchealthd.objects.NodeInstance as NodeInstance
import daemon_lib.config as cfg
import daemon_lib.log as log
from time import sleep
import os
import signal
# Daemon version
version = "0.9.82"
##########################################################
# Entrypoint
##########################################################
def entrypoint():
monitoring_instance = None
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvchealthd"
config["daemon_version"] = version
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|--------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster health daemon v{0: <20} |".format(version))
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|--------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s")
# Connect to Zookeeper and return our handler and current schema version
zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config)
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, monitoring_instance
logger.out("Terminating pvchealthd and cleaning up", state="s")
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Close the Zookeeper connection
try:
zkhandler.disconnect(persistent=True)
del zkhandler
except Exception:
pass
logger.out("Terminated health daemon", state="s")
logger.terminate()
if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function
def term(signum="", frame=""):
cleanup(failure=False)
# Hangup (logrotate) function
def hup(signum="", frame=""):
if config["file_logging"]:
logger.hup()
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGHUP, hup)
this_node = NodeInstance.NodeInstance(
config["node_hostname"],
zkhandler,
config,
logger,
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Tick loop; does nothing since everything is async
while True:
try:
sleep(1)
except Exception:
break

View File

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# PluginInstance.py - Class implementing a PVC monitoring instance # MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system # Part of the Parallel Virtual Cluster (PVC) system
# #
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me> # Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
@ -317,8 +317,17 @@ class MonitoringInstance(object):
) )
if successful_plugins < 1: if successful_plugins < 1:
self.logger.out(
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
state="e",
)
return return
self.logger.out(
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
state="s",
)
# Clean up any old plugin data for which a plugin file no longer exists # Clean up any old plugin data for which a plugin file no longer exists
plugins_data = self.zkhandler.children( plugins_data = self.zkhandler.children(
("node.monitoring.data", self.this_node.name) ("node.monitoring.data", self.this_node.name)
@ -344,6 +353,7 @@ class MonitoringInstance(object):
def shutdown(self): def shutdown(self):
self.stop_check_timer() self.stop_check_timer()
self.run_cleanups() self.run_cleanups()
return
def start_check_timer(self): def start_check_timer(self):
check_interval = self.config["monitoring_interval"] check_interval = self.config["monitoring_interval"]
@ -395,6 +405,11 @@ class MonitoringInstance(object):
active_coordinator_state = self.this_node.coordinator_state active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now() runtime_start = datetime.now()
self.logger.out(
"Starting monitoring healthcheck run",
state="t",
)
total_health = 100 total_health = 100
plugin_results = list() plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
@ -406,10 +421,7 @@ class MonitoringInstance(object):
plugin_results.append(future.result()) plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name): for result in sorted(plugin_results, key=lambda x: x.plugin_name):
if ( if self.config["log_monitoring_details"]:
self.config["log_keepalives"]
and self.config["log_keepalive_plugin_details"]
):
self.logger.out( self.logger.out(
result.message + f" [-{result.health_delta}]", result.message + f" [-{result.health_delta}]",
state="t", state="t",

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
class NodeInstance(object):
# Initialization function
def __init__(
self,
name,
zkhandler,
config,
logger,
):
# Passed-in variables on creation
self.name = name
self.zkhandler = zkhandler
self.config = config
self.logger = logger
# States
self.daemon_state = "stop"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Node resources
self.health = 100
self.active_domains_count = 0
self.provisioned_domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.daemon", self.name)
)
def watch_node_daemonstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "stop"
if data != self.daemon_state:
self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.router", self.name)
)
def watch_node_routerstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "client"
if data != self.coordinator_state:
self.coordinator_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.domain", self.name)
)
def watch_node_domainstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "unknown"
if data != self.domain_state:
self.domain_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.monitoring.health", self.name)
)
def watch_node_health(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 100
try:
data = int(data)
except ValueError:
pass
if data != self.health:
self.health = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.free", self.name)
)
def watch_node_memfree(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.used", self.name)
)
def watch_node_memused(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.allocated", self.name)
)
def watch_node_memalloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.vcpu.allocated", self.name)
)
def watch_node_vcpualloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.running_domains", self.name)
)
def watch_node_runningdomains(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii").split()
except AttributeError:
data = []
if len(data) != self.active_domains_count:
self.active_domains_count = len(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.count.provisioned_domains", self.name)
)
def watch_node_domainscount(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.provisioned_domains_count:
self.provisioned_domains_count = data

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
# <Filename> - <Description>
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
##############################################################################
from daemon_lib.zkhandler import ZKHandler
import os
import time
def connect(logger, config):
# Create an instance of the handler
zkhandler = ZKHandler(config, logger)
try:
logger.out(
"Connecting to Zookeeper on coordinator nodes {}".format(
config["coordinators"]
),
state="i",
)
# Start connection
zkhandler.connect(persistent=True)
except Exception as e:
logger.out(
"ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e"
)
os._exit(1)
logger.out("Validating Zookeeper schema", state="i")
try:
node_schema_version = int(
zkhandler.read(("node.data.active_schema", config["node_hostname"]))
)
except Exception:
node_schema_version = int(zkhandler.read("base.schema.version"))
zkhandler.write(
[
(
("node.data.active_schema", config["node_hostname"]),
node_schema_version,
)
]
)
# Load in the current node schema version
zkhandler.schema.load(node_schema_version)
# Record the latest intalled schema version
latest_schema_version = zkhandler.schema.find_latest()
logger.out("Latest installed schema is {}".format(latest_schema_version), state="i")
zkhandler.write(
[(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)]
)
# If we are the last node to get a schema update, fire the master update
if latest_schema_version > node_schema_version:
node_latest_schema_version = list()
for node in zkhandler.children("base.node"):
node_latest_schema_version.append(
int(zkhandler.read(("node.data.latest_schema", node)))
)
# This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load.
if node_latest_schema_version.count(latest_schema_version) == len(
node_latest_schema_version
):
zkhandler.write([("base.schema.version", latest_schema_version)])
return zkhandler, node_schema_version
def validate_schema(logger, zkhandler):
# Validate our schema against the active version
if not zkhandler.schema.validate(zkhandler, logger):
logger.out("Found schema violations, applying", state="i")
zkhandler.schema.apply(zkhandler)
else:
logger.out("Schema successfully validated", state="o")
def setup_node(logger, config, zkhandler):
# Check if our node exists in Zookeeper, and create it if not
if config["daemon_mode"] == "coordinator":
init_routerstate = "secondary"
else:
init_routerstate = "client"
if zkhandler.exists(("node", config["node_hostname"])):
logger.out(
f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i"
)
# Update static data just in case it's changed
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
]
)
else:
logger.out(
f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node",
state="i",
)
keepalive_time = int(time.time())
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.keepalive", config["node_hostname"]), str(keepalive_time)),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.domain", config["node_hostname"]), "flushed"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
(("node.memory.total", config["node_hostname"]), "0"),
(("node.memory.used", config["node_hostname"]), "0"),
(("node.memory.free", config["node_hostname"]), "0"),
(("node.memory.allocated", config["node_hostname"]), "0"),
(("node.memory.provisioned", config["node_hostname"]), "0"),
(("node.vcpu.allocated", config["node_hostname"]), "0"),
(("node.cpu.load", config["node_hostname"]), "0.0"),
(("node.running_domains", config["node_hostname"]), "0"),
(("node.count.provisioned_domains", config["node_hostname"]), "0"),
(("node.count.networks", config["node_hostname"]), "0"),
]
)

View File

@ -20,14 +20,12 @@
############################################################################### ###############################################################################
import pvcnoded.util.keepalive import pvcnoded.util.keepalive
import pvcnoded.util.config
import pvcnoded.util.fencing import pvcnoded.util.fencing
import pvcnoded.util.networking import pvcnoded.util.networking
import pvcnoded.util.services import pvcnoded.util.services
import pvcnoded.util.libvirt import pvcnoded.util.libvirt
import pvcnoded.util.zookeeper import pvcnoded.util.zookeeper
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance import pvcnoded.objects.VMInstance as VMInstance
@ -36,6 +34,7 @@ import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
import pvcnoded.objects.CephInstance as CephInstance import pvcnoded.objects.CephInstance as CephInstance
import daemon_lib.config as cfg
import daemon_lib.log as log import daemon_lib.log as log
import daemon_lib.common as common import daemon_lib.common as common
@ -59,41 +58,40 @@ version = "0.9.82"
def entrypoint(): def entrypoint():
keepalive_timer = None keepalive_timer = None
monitoring_instance = None
# Get our configuration # Get our configuration
config = pvcnoded.util.config.get_configuration() config = cfg.get_configuration()
config["daemon_name"] = "pvcnoded" config["daemon_name"] = "pvcnoded"
config["daemon_version"] = version config["daemon_version"] = version
# Create and validate our directories # Create and validate our directories
pvcnoded.util.config.validate_directories(config) cfg.validate_directories(config)
# Set up the logger instance # Set up the logger instance
logger = log.Logger(config) logger = log.Logger(config)
# Print our startup message # Print our startup message
logger.out("") logger.out("")
logger.out("|------------------------------------------------------------|") logger.out("|--------------------------------------------------------------|")
logger.out("| |") logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |") logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |") logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |") logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |") logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |") logger.out("| |")
logger.out("|------------------------------------------------------------|") logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster node daemon v{0: <20} |".format(version)) logger.out("| Parallel Virtual Cluster node daemon v{0: <22} |".format(version))
logger.out("| Debug: {0: <51} |".format(str(config["debug"]))) logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <52} |".format(config["node_fqdn"])) logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <52} |".format(config["node_hostname"])) logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <54} |".format(config["node_id"])) logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <43} |".format(config["ipmi_hostname"])) logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |") logger.out("| Machine details: |")
logger.out("| CPUs: {0: <50} |".format(config["static_data"][0])) logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <50} |".format(config["static_data"][3])) logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <52} |".format(config["static_data"][2])) logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <48} |".format(config["static_data"][1])) logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|------------------------------------------------------------|") logger.out("|--------------------------------------------------------------|")
logger.out("") logger.out("")
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s") logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s")
@ -202,7 +200,7 @@ def entrypoint():
# Define a cleanup function # Define a cleanup function
def cleanup(failure=False): def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance nonlocal logger, zkhandler, keepalive_timer, d_domain
logger.out("Terminating pvcnoded and cleaning up", state="s") logger.out("Terminating pvcnoded and cleaning up", state="s")
@ -251,13 +249,6 @@ def entrypoint():
except Exception: except Exception:
pass pass
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Set stop state in Zookeeper # Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")]) zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
@ -395,8 +386,8 @@ def entrypoint():
node_list = new_node_list node_list = new_node_list
logger.out( logger.out(
f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}', f'{logger.fmt_cyan}Node list:{logger.fmt_end} {" ".join(node_list)}',
state="i", state="s",
) )
# Update node objects lists # Update node objects lists
@ -563,8 +554,8 @@ def entrypoint():
# Update the new list # Update the new list
network_list = new_network_list network_list = new_network_list
logger.out( logger.out(
f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}', f'{logger.fmt_cyan}Network list:{logger.fmt_end} {" ".join(network_list)}',
state="i", state="s",
) )
# Update node objects list # Update node objects list
@ -894,8 +885,8 @@ def entrypoint():
sriov_vf_list = sorted(new_sriov_vf_list) sriov_vf_list = sorted(new_sriov_vf_list)
logger.out( logger.out(
f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}', f'{logger.fmt_cyan}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
state="i", state="s",
) )
if config["enable_hypervisor"]: if config["enable_hypervisor"]:
@ -921,8 +912,8 @@ def entrypoint():
# Update the new list # Update the new list
domain_list = new_domain_list domain_list = new_domain_list
logger.out( logger.out(
f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}', f'{logger.fmt_cyan}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
state="i", state="s",
) )
# Update node objects' list # Update node objects' list
@ -948,8 +939,8 @@ def entrypoint():
# Update the new list # Update the new list
osd_list = new_osd_list osd_list = new_osd_list
logger.out( logger.out(
f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}', f'{logger.fmt_cyan}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
state="i", state="s",
) )
# Pool objects # Pool objects
@ -973,8 +964,8 @@ def entrypoint():
# Update the new list # Update the new list
pool_list = new_pool_list pool_list = new_pool_list
logger.out( logger.out(
f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}', f'{logger.fmt_cyan}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
state="i", state="s",
) )
# Volume objects (in each pool) # Volume objects (in each pool)
@ -1005,15 +996,10 @@ def entrypoint():
# Update the new list # Update the new list
volume_list[pool] = new_volume_list volume_list[pool] = new_volume_list
logger.out( logger.out(
f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}', f'{logger.fmt_cyan}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
state="i", state="s",
) )
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread # Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer( keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node logger, config, zkhandler, this_node

View File

@ -131,11 +131,11 @@ class MetadataAPIInstance(object):
# Helper functions # Helper functions
def open_database(self): def open_database(self):
conn = psycopg2.connect( conn = psycopg2.connect(
host=self.config["metadata_postgresql_host"], host=self.config["api_postgresql_host"],
port=self.config["metadata_postgresql_port"], port=self.config["api_postgresql_port"],
dbname=self.config["metadata_postgresql_dbname"], dbname=self.config["api_postgresql_dbname"],
user=self.config["metadata_postgresql_user"], user=self.config["api_postgresql_user"],
password=self.config["metadata_postgresql_password"], password=self.config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=RealDictCursor) cur = conn.cursor(cursor_factory=RealDictCursor)
return conn, cur return conn, cur

View File

@ -700,6 +700,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
active_coordinator_state = this_node.coordinator_state active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now() runtime_start = datetime.now()
logger.out(
"Starting node keepalive run",
state="t",
)
# Set the migration selector in Zookeeper for clients to read # Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]: if config["enable_hypervisor"]:

View File

@ -290,7 +290,7 @@ logging:
# Enable or disable cluster detail logging during keepalive events # Enable or disable cluster detail logging during keepalive events
log_cluster_details: yes log_cluster_details: yes
# Enable or disable monitoring detail logging during keepalive events # Enable or disable monitoring detail logging during healthcheck events
log_monitoring_details: yes log_monitoring_details: yes
# Number of VM console log lines to store in Zookeeper (per VM) # Number of VM console log lines to store in Zookeeper (per VM)

1
worker-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

24
worker-daemon/pvcworkerd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvcworkerd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcworkerd.Daemon # noqa: F401
pvcworkerd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster worker daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvcworkerd.sh
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
# app arguments work in a non-backwards-compatible way with Celery 5. # app arguments work in a non-backwards-compatible way with Celery 5.
case "$( cat /etc/debian_version )" in case "$( cat /etc/debian_version )" in
10.*) 10.*)
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" CELERY_ARGS="worker --app pvcworkerd.Daemon.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;; ;;
*) *)
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" CELERY_ARGS="--app pvcworkerd.Daemon.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;; ;;
esac esac

View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
# Daemon.py - PVC Node Worker daemon
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from celery import Celery
import daemon_lib.config as cfg
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from daemon_lib.benchmark import (
worker_run_benchmark,
)
from daemon_lib.vmbuilder import (
worker_create_vm,
)
version = "0.9.82"
config = cfg.get_configuration()
config["daemon_name"] = "pvcworkerd"
config["daemon_version"] = version
celery_task_uri = "redis://{}:{}{}".format(
config["keydb_host"], config["keydb_port"], config["keydb_path"]
)
celery = Celery(
"pvcworkerd",
broker=celery_task_uri,
result_backend=celery_task_uri,
result_extended=True,
)
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return worker_create_vm(
self,
config,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, run_on="primary"):
@ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool)
return run_storage_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
def entrypoint():
pass

View File