Compare commits

..

40 Commits

Author SHA1 Message Date
102c3c3106 Port all Celery worker functions to discrete pkg
Moves all tasks run by the Celery worker into a discrete package/module
for easier installation. Also adjusts several parameters throughout to
accomplish this.
2023-11-30 02:24:54 -05:00
0c0fb65c62 Rework Flask API to route Celery tasks manually
Avoids needing to define any of these tasks here; they can all be
defined in the pvcworkerd code.
2023-11-30 00:40:09 -05:00
03a738f878 Move config parser into daemon_lib
And reformat/add config values for API.
2023-11-30 00:05:37 -05:00
4df5fdbca6 Update description of example conf 2023-11-29 21:21:51 -05:00
97eb63ebab Clean up config naming and dead files 2023-11-29 21:21:51 -05:00
4a2eba0961 Improve node output messages (from pvchealthd)
1. Output startup "list" entries in cyan with s state
2. Add start of keepalive run message
2023-11-29 21:21:51 -05:00
077dd8708f Add check start message 2023-11-29 21:21:51 -05:00
b6b5786c3b Output list in cyan (s state) 2023-11-29 21:21:51 -05:00
ad738dec40 Clean up plugin pycache too 2023-11-29 21:21:51 -05:00
d2b764a2c7 Output more details on startup 2023-11-29 21:21:51 -05:00
b8aecd9c83 Wait less time between restarts 2023-11-29 21:21:51 -05:00
11db3c5b20 Fix ordering during termination 2023-11-29 21:21:51 -05:00
7a7c975eff Ensure return from health shutdown 2023-11-29 21:21:51 -05:00
fa12a3c9b1 Permit buffered log appending 2023-11-29 21:21:51 -05:00
787f4216b3 Expand Zookeeper log daemon prefix to match 2023-11-29 21:21:51 -05:00
647cba3cf5 Expand startup width for new daemon name 2023-11-29 21:21:51 -05:00
921ecb3a05 Fix name in kydb plugin 2023-11-29 21:21:51 -05:00
6a68cf665b Wait between service restarts 2023-11-29 21:21:51 -05:00
41f4e4fb2f Split health monitoring into discrete daemon/pkg 2023-11-29 21:21:51 -05:00
74a416165d Move default autobackup config to pvc.conf 2023-11-29 21:21:37 -05:00
83ceb41138 Add daemon name to Logger entries 2023-11-29 15:18:37 -05:00
2e5958640a Remove erroneous time from message 2023-11-29 15:12:41 -05:00
d65b18f15b Improve handling of loglines on client 2023-11-29 15:12:41 -05:00
7abc697c8a Improve Zookeeper log handling
Ensures that messages are fully read before each append. Adds more
Zookeeper hits, but ensures logs won't be overwritten by multiple
daemons.

Also don't use a set on the client side, to avoid "removing duplicate"
entries erroneously.
2023-11-29 15:12:41 -05:00
1adc3674b6 Install everything uploaded 2023-11-29 15:12:28 -05:00
bd811408f9 Remove "Python 3" from package descriptions 2023-11-29 15:12:09 -05:00
6090b286fe Improve copying to avoid leaving stale files 2023-11-29 14:46:59 -05:00
2545a7b744 Allow similar for IPMI hostnames 2023-11-28 16:09:01 -05:00
ce907ff26a Allow specifying static IPs instead of a file 2023-11-28 15:28:31 -05:00
71e589e461 Remove superflous debug output
This is printed in the startup logo block anyways.
2023-11-27 13:46:30 -05:00
fc3d292081 Add missing subdirectory configs 2023-11-27 13:40:07 -05:00
eab1ae873b Ensure upstream_gateway key will exist 2023-11-27 13:37:57 -05:00
eaf93cdf96 Readd missing subsystem configurations 2023-11-27 13:33:41 -05:00
c8f4cbb39e Fix node entry keys 2023-11-27 13:24:01 -05:00
786fae7769 Improve logo output 2023-11-27 13:01:43 -05:00
17f81e8296 Refactor pvcapid to use new configuration 2023-11-27 12:49:26 -05:00
bcc57638a9 Refactor pvcnoded to use new configuration 2023-11-26 15:41:25 -05:00
a593ee9c2e Reorganize and add more configuration items 2023-11-26 15:32:53 -05:00
2666e0603e Update dnsmasq script to use new config file 2023-11-26 14:18:13 -05:00
dab7396196 Move to unified pvc.conf configuration file 2023-11-26 14:16:21 -05:00
72 changed files with 2054 additions and 918 deletions

View File

@ -1,80 +0,0 @@
---
# pvcapid configuration file example
#
# This configuration file specifies details for the PVC API daemon running on
# this machine. Default values are not supported; the values in this sample
# configuration are considered defaults and can be used as-is.
#
# Copy this example to /etc/pvc/pvcapid.conf and edit to your needs
pvc:
# debug: Enable/disable API debug mode
debug: True
# coordinators: The list of cluster coordinator hostnames
coordinators:
- pvchv1
- pvchv2
- pvchv3
# api: Configuration of the API listener
api:
# listen_address: IP address(es) to listen on; use 0.0.0.0 for all interfaces
listen_address: "127.0.0.1"
# listen_port: TCP port to listen on, usually 7370
listen_port: "7370"
# authentication: Authentication and security settings
authentication:
# enabled: Enable or disable authentication (True/False)
enabled: False
# secret_key: Per-cluster secret key for API cookies; generate with uuidgen or pwgen
secret_key: ""
# tokens: a list of authentication tokens; leave as an empty list to disable authentication
tokens:
# description: token description for management
- description: "testing"
# token: random token for authentication; generate with uuidgen or pwgen
token: ""
# ssl: SSL configuration
ssl:
# enabled: Enabled or disable SSL operation (True/False)
enabled: False
# cert_file: SSL certificate file
cert_file: ""
# key_file: SSL certificate key file
key_file: ""
# provisioner: Configuration of the Provisioner API listener
provisioner:
# database: Backend database configuration
database:
# host: PostgreSQL hostname, usually 'localhost'
host: localhost
# port: PostgreSQL port, invariably '5432'
port: 5432
# name: PostgreSQL database name, invariably 'pvcapi'
name: pvcapi
# user: PostgreSQL username, invariable 'pvcapi'
user: pvcapi
# pass: PostgreSQL user password, randomly generated
pass: pvcapi
# queue: Celery backend queue using the PVC Zookeeper cluster
queue:
# host: Redis hostname, usually 'localhost'
host: localhost
# port: Redis port, invariably '6279'
port: 6379
# path: Redis queue path, invariably '/0'
path: /0
# ceph_cluster: Information about the Ceph storage cluster
ceph_cluster:
# storage_hosts: The list of hosts that the Ceph monitors are valid on; if empty (the default),
# uses the list of coordinators
storage_hosts:
- pvchv1
- pvchv2
- pvchv3
# storage_domain: The storage domain name, concatenated with the coordinators list names
# to form monitor access strings
storage_domain: "pvc.storage"
# ceph_monitor_port: The port that the Ceph monitor on each coordinator listens on
ceph_monitor_port: 6789
# ceph_storage_secret_uuid: Libvirt secret UUID for Ceph storage access
ceph_storage_secret_uuid: ""

View File

@ -8,7 +8,7 @@ After = network-online.target
Type = simple Type = simple
WorkingDirectory = /usr/share/pvc WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvcapid.yaml Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStart = /usr/share/pvc/pvcapid.py ExecStart = /usr/share/pvc/pvcapid.py
Restart = on-failure Restart = on-failure

View File

@ -19,13 +19,13 @@
# #
############################################################################### ###############################################################################
import os
import yaml
from ssl import SSLContext, TLSVersion from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg
# Daemon version # Daemon version
version = "0.9.82" version = "0.9.82"
@ -53,67 +53,11 @@ def strtobool(stringv):
# Configuration Parsing # Configuration Parsing
########################################################## ##########################################################
# Parse the configuration file
try:
pvcapid_config_file = os.environ["PVC_CONFIG_FILE"]
except Exception:
print(
'Error: The "PVC_CONFIG_FILE" environment variable must be set before starting pvcapid.'
)
exit(1)
print('Loading configuration from file "{}"'.format(pvcapid_config_file)) # Get our configuration
config = cfg.get_configuration()
# Read in the config config["daemon_name"] = "pvcapid"
try: config["daemon_version"] = version
with open(pvcapid_config_file, "r") as cfgfile:
o_config = yaml.load(cfgfile, Loader=yaml.BaseLoader)
except Exception as e:
print("ERROR: Failed to parse configuration file: {}".format(e))
exit(1)
try:
# Create the config object
config = {
"debug": strtobool(o_config["pvc"]["debug"]),
"coordinators": o_config["pvc"]["coordinators"],
"listen_address": o_config["pvc"]["api"]["listen_address"],
"listen_port": int(o_config["pvc"]["api"]["listen_port"]),
"auth_enabled": strtobool(o_config["pvc"]["api"]["authentication"]["enabled"]),
"auth_secret_key": o_config["pvc"]["api"]["authentication"]["secret_key"],
"auth_tokens": o_config["pvc"]["api"]["authentication"]["tokens"],
"ssl_enabled": strtobool(o_config["pvc"]["api"]["ssl"]["enabled"]),
"ssl_key_file": o_config["pvc"]["api"]["ssl"]["key_file"],
"ssl_cert_file": o_config["pvc"]["api"]["ssl"]["cert_file"],
"database_host": o_config["pvc"]["provisioner"]["database"]["host"],
"database_port": int(o_config["pvc"]["provisioner"]["database"]["port"]),
"database_name": o_config["pvc"]["provisioner"]["database"]["name"],
"database_user": o_config["pvc"]["provisioner"]["database"]["user"],
"database_password": o_config["pvc"]["provisioner"]["database"]["pass"],
"queue_host": o_config["pvc"]["provisioner"]["queue"]["host"],
"queue_port": o_config["pvc"]["provisioner"]["queue"]["port"],
"queue_path": o_config["pvc"]["provisioner"]["queue"]["path"],
"storage_hosts": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_hosts"
],
"storage_domain": o_config["pvc"]["provisioner"]["ceph_cluster"][
"storage_domain"
],
"ceph_monitor_port": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_monitor_port"
],
"ceph_storage_secret_uuid": o_config["pvc"]["provisioner"]["ceph_cluster"][
"ceph_storage_secret_uuid"
],
}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"]:
config["storage_hosts"] = config["coordinators"]
except Exception as e:
print("ERROR: Failed to load configuration: {}".format(e))
exit(1)
########################################################## ##########################################################
@ -124,41 +68,43 @@ except Exception as e:
def entrypoint(): def entrypoint():
import pvcapid.flaskapi as pvc_api # noqa: E402 import pvcapid.flaskapi as pvc_api # noqa: E402
if config["ssl_enabled"]: if config["api_ssl_enabled"]:
context = SSLContext() context = SSLContext()
context.minimum_version = TLSVersion.TLSv1 context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs() context.get_ca_certs()
context.load_cert_chain(config["ssl_cert_file"], keyfile=config["ssl_key_file"]) context.load_cert_chain(
config["api_ssl_cert_file"], keyfile=config["api_ssl_key_file"]
)
else: else:
context = None context = None
# Print our startup messages # Print our startup messages
print("") print("")
print("|----------------------------------------------------------|") print("|--------------------------------------------------------------|")
print("| |") print("| |")
print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |") print("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
print("| ██ ▜█▙ ▟█▛ ██ |") print("| ██ ▜█▙ ▟█▛ ██ |")
print("| ███████████ ▜█▙ ▟█▛ ██ |") print("| ███████████ ▜█▙ ▟█▛ ██ |")
print("| ██ ▜█▙▟█▛ ███████████ |") print("| ██ ▜█▙▟█▛ ███████████ |")
print("| |") print("| |")
print("|----------------------------------------------------------|") print("|--------------------------------------------------------------|")
print("| Parallel Virtual Cluster API daemon v{0: <19} |".format(version)) print("| Parallel Virtual Cluster API daemon v{0: <23} |".format(version))
print("| Debug: {0: <49} |".format(str(config["debug"]))) print("| Debug: {0: <53} |".format(str(config["debug"])))
print("| API version: v{0: <42} |".format(API_VERSION)) print("| API version: v{0: <46} |".format(API_VERSION))
print( print(
"| Listen: {0: <48} |".format( "| Listen: {0: <52} |".format(
"{}:{}".format(config["listen_address"], config["listen_port"]) "{}:{}".format(config["api_listen_address"], config["api_listen_port"])
) )
) )
print("| SSL: {0: <51} |".format(str(config["ssl_enabled"]))) print("| SSL: {0: <55} |".format(str(config["api_ssl_enabled"])))
print("| Authentication: {0: <40} |".format(str(config["auth_enabled"]))) print("| Authentication: {0: <44} |".format(str(config["api_auth_enabled"])))
print("|----------------------------------------------------------|") print("|--------------------------------------------------------------|")
print("") print("")
pvc_api.celery_startup() pvc_api.celery_startup()
pvc_api.app.run( pvc_api.app.run(
config["listen_address"], config["api_listen_address"],
config["listen_port"], config["api_listen_port"],
threaded=True, threaded=True,
ssl_context=context, ssl_context=context,
) )

View File

@ -31,25 +31,12 @@ from uuid import uuid4
from daemon_lib.common import getPrimaryNode from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection from daemon_lib.zkhandler import ZKConnection
from daemon_lib.node import get_list as get_node_list from daemon_lib.node import get_list as get_node_list
from daemon_lib.vm import ( from daemon_lib.benchmark import list_benchmarks
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from pvcapid.Daemon import config, strtobool, API_VERSION from pvcapid.Daemon import config, strtobool, API_VERSION
import pvcapid.helper as api_helper import pvcapid.helper as api_helper
import pvcapid.provisioner as api_provisioner import pvcapid.provisioner as api_provisioner
import pvcapid.vmbuilder as api_vmbuilder
import pvcapid.benchmark as api_benchmark
import pvcapid.ova as api_ova import pvcapid.ova as api_ova
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
@ -61,11 +48,11 @@ app = flask.Flask(__name__)
# Set up SQLAlchemy backend # Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format( app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
config["database_user"], config["api_postgresql_user"],
config["database_password"], config["api_postgresql_password"],
config["database_host"], config["api_postgresql_host"],
config["database_port"], config["api_postgresql_port"],
config["database_name"], config["api_postgresql_dbname"],
) )
if config["debug"]: if config["debug"]:
@ -73,8 +60,8 @@ if config["debug"]:
else: else:
app.config["DEBUG"] = False app.config["DEBUG"] = False
if config["auth_enabled"]: if config["api_auth_enabled"]:
app.config["SECRET_KEY"] = config["auth_secret_key"] app.config["SECRET_KEY"] = config["api_auth_secret_key"]
# Create SQLAlchemy database # Create SQLAlchemy database
db = SQLAlchemy(app) db = SQLAlchemy(app)
@ -99,41 +86,34 @@ def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler) return getPrimaryNode(zkhandler)
# Set up Celery queue routing # Set up Celery task ID generator
def route_task(name, args, kwargs, options, task=None, **kw): # 1. Lets us make our own IDs (first section of UUID)
print("----") # 2. Lets us distribute jobs to the required pvcworkerd instances
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}") def run_celery_task(task_name, **kwargs):
task_id = str(uuid4()).split("-")[0]
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue if "run_on" in kwargs and kwargs["run_on"] != "primary":
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys(): run_on = kwargs["run_on"]
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
else: else:
run_on = get_primary_node() run_on = get_primary_node()
print(f"Selected Celery worker: {run_on}") print(
print("----") f"Incoming pvcworkerd task: '{task_name}' ({task_id}) assigned to worker {run_on} with args {kwargs}"
return run_on
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
) )
task = celery.send_task(
task_name,
task_id=task_id,
kwargs=kwargs,
queue=run_on,
)
return task return task
# Create celery definition # Create celery definition
celery_task_uri = "redis://{}:{}{}".format( celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"] config["keydb_host"], config["keydb_port"], config["keydb_path"]
) )
celery = Celery( celery = Celery(
app.name, app.name,
@ -153,7 +133,6 @@ def celery_startup():
app.config["task_queues"] = tuple( app.config["task_queues"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()] [Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
) )
app.config["task_routes"] = (route_task,)
celery.conf.update(app.config) celery.conf.update(app.config)
@ -199,7 +178,7 @@ def Authenticator(function):
@wraps(function) @wraps(function)
def authenticate(*args, **kwargs): def authenticate(*args, **kwargs):
# No authentication required # No authentication required
if not config["auth_enabled"]: if not config["api_auth_enabled"]:
return function(*args, **kwargs) return function(*args, **kwargs)
# Session-based authentication # Session-based authentication
if "token" in flask.session: if "token" in flask.session:
@ -208,7 +187,7 @@ def Authenticator(function):
if "X-Api-Key" in flask.request.headers: if "X-Api-Key" in flask.request.headers:
if any( if any(
token token
for token in config["auth_tokens"] for token in config["api_auth_tokens"]
if flask.request.headers.get("X-Api-Key") == token.get("token") if flask.request.headers.get("X-Api-Key") == token.get("token")
): ):
return function(*args, **kwargs) return function(*args, **kwargs)
@ -220,171 +199,6 @@ def Authenticator(function):
return authenticate return authenticate
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool=None, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
########################################################## ##########################################################
# API Root/Authentication # API Root/Authentication
########################################################## ##########################################################
@ -469,12 +283,12 @@ class API_Login(Resource):
type: object type: object
id: Message id: Message
""" """
if not config["auth_enabled"]: if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root)) return flask.redirect(Api.url_for(api, API_Root))
if any( if any(
token token
for token in config["auth_tokens"] for token in config["api_auth_tokens"]
if flask.request.values["token"] in token["token"] if flask.request.values["token"] in token["token"]
): ):
flask.session["token"] = flask.request.form["token"] flask.session["token"] = flask.request.form["token"]
@ -503,7 +317,7 @@ class API_Logout(Resource):
302: 302:
description: Authentication disabled description: Authentication disabled
""" """
if not config["auth_enabled"]: if not config["api_auth_enabled"]:
return flask.redirect(Api.url_for(api, API_Root)) return flask.redirect(Api.url_for(api, API_Root))
flask.session.pop("token", None) flask.session.pop("token", None)
@ -2459,7 +2273,7 @@ class API_VM_Locks(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node) task = run_celery_task("vm.flush_locks", domain=vm, run_on=vm_node)
return ( return (
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node}, {"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
@ -2594,7 +2408,7 @@ class API_VM_Device(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node) task = run_celery_task("vm.device_attach", domain=vm, xml=xml, run_on=vm_node)
return ( return (
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node}, {"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
@ -2648,7 +2462,7 @@ class API_VM_Device(Resource):
else: else:
return vm_node_detail, retcode return vm_node_detail, retcode
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node) task = run_celery_task("vm.device_detach", domain=vm, xml=xml, run_on=vm_node)
return ( return (
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node}, {"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
@ -4364,7 +4178,7 @@ class API_Storage_Ceph_Benchmark(Resource):
type: string (integer) type: string (integer)
description: The number of minor page faults during the test description: The number of minor page faults during the test
""" """
return api_benchmark.list_benchmarks(reqargs.get("job", None)) return list_benchmarks(config, reqargs.get("job", None))
@RequestParser( @RequestParser(
[ [
@ -4405,7 +4219,7 @@ class API_Storage_Ceph_Benchmark(Resource):
}, 400 }, 400
task = run_celery_task( task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary" "storage.benchmark", pool=reqargs.get("pool", None), run_on="primary"
) )
return ( return (
{ {
@ -4531,7 +4345,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
node = reqargs.get("node", None) node = reqargs.get("node", None)
task = run_celery_task( task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node "osd.add_db_vg", device=reqargs.get("device", None), run_on=node
) )
return ( return (
@ -4730,7 +4544,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
node = reqargs.get("node", None) node = reqargs.get("node", None)
task = run_celery_task( task = run_celery_task(
osd_add, "osd.add",
device=reqargs.get("device", None), device=reqargs.get("device", None),
weight=reqargs.get("weight", None), weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None), ext_db_ratio=reqargs.get("ext_db_ratio", None),
@ -4849,7 +4663,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode return osd_node_detail, retcode
task = run_celery_task( task = run_celery_task(
osd_replace, "osd.replace",
osd_id=osdid, osd_id=osdid,
new_device=reqargs.get("new_device"), new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None), old_device=reqargs.get("old_device", None),
@ -4907,7 +4721,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode return osd_node_detail, retcode
task = run_celery_task( task = run_celery_task(
osd_refresh, "osd.refresh",
osd_id=osdid, osd_id=osdid,
device=reqargs.get("device", None), device=reqargs.get("device", None),
ext_db_flag=False, ext_db_flag=False,
@ -4978,7 +4792,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
return osd_node_detail, retcode return osd_node_detail, retcode
task = run_celery_task( task = run_celery_task(
osd_remove, "osd.remove",
osd_id=osdid, osd_id=osdid,
force_flag=reqargs.get("force", False), force_flag=reqargs.get("force", False),
run_on=node, run_on=node,
@ -8507,7 +8321,7 @@ class API_Provisioner_Create_Root(Resource):
start_vm = False start_vm = False
task = run_celery_task( task = run_celery_task(
create_vm, "provisioner.create",
vm_name=reqargs.get("name", None), vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None), profile_name=reqargs.get("profile", None),
define_vm=define_vm, define_vm=define_vm,

View File

@ -48,11 +48,11 @@ import pvcapid.provisioner as provisioner
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_name"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur return conn, cur

View File

@ -63,11 +63,11 @@ class ProvisioningError(Exception):
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_dbname"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur return conn, cur

View File

@ -1,16 +0,0 @@
# Parallel Virtual Cluster Celery Worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster Celery Worker daemon
After = network-online.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvcapid.yaml
ExecStart = /usr/share/pvc/pvcworkerd.sh
Restart = on-failure
[Install]
WantedBy = multi-user.target

View File

@ -42,23 +42,33 @@ echo " done. Package version ${version}."
# Install the client(s) locally # Install the client(s) locally
echo -n "Installing client packages locally..." echo -n "Installing client packages locally..."
$SUDO dpkg -i ../pvc-client*_${version}*.deb &>/dev/null $SUDO dpkg -i --force-all ../pvc-client*_${version}*.deb &>/dev/null
echo " done". echo " done".
for HOST in ${HOSTS[@]}; do for HOST in ${HOSTS[@]}; do
echo "> Deploying packages to host ${HOST}" echo -n "Copying packages to host ${HOST}..."
echo -n "Copying packages..."
ssh $HOST $SUDO rm -rf /tmp/pvc &>/dev/null ssh $HOST $SUDO rm -rf /tmp/pvc &>/dev/null
ssh $HOST mkdir /tmp/pvc &>/dev/null ssh $HOST mkdir /tmp/pvc &>/dev/null
scp ../pvc-*_${version}*.deb $HOST:/tmp/pvc/ &>/dev/null scp ../pvc-*_${version}*.deb $HOST:/tmp/pvc/ &>/dev/null
echo " done." echo " done."
done
if [[ -z ${KEEP_ARTIFACTS} ]]; then
rm ../pvc*_${version}*
fi
for HOST in ${HOSTS[@]}; do
echo "> Deploying packages to host ${HOST}"
echo -n "Installing packages..." echo -n "Installing packages..."
ssh $HOST $SUDO dpkg -i /tmp/pvc/{pvc-client-cli,pvc-daemon-common,pvc-daemon-api,pvc-daemon-node}*.deb &>/dev/null ssh $HOST $SUDO dpkg -i --force-all /tmp/pvc/*.deb &>/dev/null
ssh $HOST rm -rf /tmp/pvc &>/dev/null ssh $HOST rm -rf /tmp/pvc &>/dev/null
echo " done." echo " done."
echo -n "Restarting PVC daemons..." echo -n "Restarting PVC daemons..."
ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null
sleep 2
ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null
echo " done." echo " done."
echo -n "Waiting for node daemon to be running..." echo -n "Waiting for node daemon to be running..."
@ -68,8 +78,5 @@ for HOST in ${HOSTS[@]}; do
done done
echo " done." echo " done."
done done
if [[ -z ${KEEP_ARTIFACTS} ]]; then
rm ../pvc*_${version}*
fi
popd &>/dev/null popd &>/dev/null

View File

@ -13,9 +13,11 @@ echo ${new_ver} >&3
tmpdir=$( mktemp -d ) tmpdir=$( mktemp -d )
cp -a debian/changelog client-cli/setup.py ${tmpdir}/ cp -a debian/changelog client-cli/setup.py ${tmpdir}/
cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py
cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py
cp -a worker-daemon/pvcworkerd/Daemon.py ${tmpdir}/worker-Daemon.py
cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py
# Replace the "base" version with the git revision version # Replace the "base" version with the git revision version
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog
cat <<EOF > debian/changelog cat <<EOF > debian/changelog
pvc (${new_ver}) unstable; urgency=medium pvc (${new_ver}) unstable; urgency=medium
@ -33,6 +35,8 @@ dpkg-buildpackage -us -uc
cp -a ${tmpdir}/changelog debian/changelog cp -a ${tmpdir}/changelog debian/changelog
cp -a ${tmpdir}/setup.py client-cli/setup.py cp -a ${tmpdir}/setup.py client-cli/setup.py
cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py
cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py
cp -a ${tmpdir}/worker-Daemon.py worker-daemon/pvcworkerd/Daemon.py
cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py
# Clean up # Clean up

View File

@ -20,6 +20,8 @@ changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )"
rm ${changelog_file} rm ${changelog_file}
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvchealthd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcworkerd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py
sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py
echo ${new_version} > .version echo ${new_version} > .version
@ -46,7 +48,7 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file}
echo -e "${deb_changelog_orig}" >> ${deb_changelog_file} echo -e "${deb_changelog_orig}" >> ${deb_changelog_file}
mv ${deb_changelog_file} debian/changelog mv ${deb_changelog_file} debian/changelog
git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version git add node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py worker-daemon/pvcworkerd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git commit -v git commit -v
popd &>/dev/null popd &>/dev/null

View File

@ -1,52 +0,0 @@
---
# Root level configuration key
autobackup:
# Backup root path on the node, used as the remote mountpoint
# Must be an absolute path beginning with '/'
# If remote_mount is enabled, the remote mount will be mounted on this directory
# If remote_mount is enabled, it is recommended to use a path under `/tmp` for this
# If remote_mount is disabled, a real filesystem must be mounted here (PVC system volumes are small!)
backup_root_path: "/tmp/backups"
# Suffix to the backup root path, used to allow multiple PVC systems to write to a single root path
# Must begin with '/'; leave empty to use the backup root path directly
# Note that most remote mount options can fake this if needed, but provided to ensure local compatability
backup_root_suffix: "/mycluster"
# VM tag(s) to back up
# Only VMs with at least one of the given tag(s) will be backed up; all others will be skipped
backup_tags:
- "backup"
- "mytag"
# Backup schedule: when and what format to take backups
backup_schedule:
full_interval: 7 # Number of total backups between full backups; others are incremental
# > If this number is 1, every backup will be a full backup and no incremental
# backups will be taken
# > If this number is 2, every second backup will be a full backup, etc.
full_retention: 2 # Keep this many full backups; the oldest will be deleted when a new one is
# taken, along with all child incremental backups of that backup
# > Should usually be at least 2 when using incrementals (full_interval > 1) to
# avoid there being too few backups after cleanup from a new full backup
# Automatic mount settings
# These settings permit running an arbitrary set of commands, ideally a "mount" command or similar, to
# ensure that a remote filesystem is mounted on the backup root path
# While the examples here show absolute paths, that is not required; they will run with the $PATH of the
# executing environment (either the "pvc" command on a CLI or a cron/systemd timer)
# A "{backup_root_path}" f-string/str.format type variable MAY be present in any cmds string to represent
# the above configured root backup path, which is interpolated at runtime
# If multiple commands are given, they will be executed in the order given; if no commands are given,
# nothing is executed, but the keys MUST be present
auto_mount:
enabled: no # Enable automatic mount/unmount support
# These commands are executed at the start of the backup run and should mount a filesystem
mount_cmds:
# This example shows an NFS mount leveraging the backup_root_path variable
- "/usr/sbin/mount.nfs -o nfsvers=3 10.0.0.10:/backups {backup_root_path}"
# These commands are executed at the end of the backup run and should unmount a filesystem
unmount_cmds:
# This example shows a generic umount leveraging the backup_root_path variable
- "/usr/bin/umount {backup_root_path}"

View File

@ -44,7 +44,7 @@ DEFAULT_STORE_DATA = {"cfgfile": "/etc/pvc/pvcapid.yaml"}
DEFAULT_STORE_FILENAME = "pvc.json" DEFAULT_STORE_FILENAME = "pvc.json"
DEFAULT_API_PREFIX = "/api/v1" DEFAULT_API_PREFIX = "/api/v1"
DEFAULT_NODE_HOSTNAME = gethostname().split(".")[0] DEFAULT_NODE_HOSTNAME = gethostname().split(".")[0]
DEFAULT_AUTOBACKUP_FILENAME = "/etc/pvc/autobackup.yaml" DEFAULT_AUTOBACKUP_FILENAME = "/etc/pvc/pvc.conf"
MAX_CONTENT_WIDTH = 120 MAX_CONTENT_WIDTH = 120

View File

@ -21,6 +21,8 @@
import time import time
from collections import deque
import pvc.lib.ansiprint as ansiprint import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import call_api from pvc.lib.common import call_api
@ -107,7 +109,9 @@ def follow_node_log(config, node, lines=10):
API schema: {"name":"{nodename}","data":"{node_log}"} API schema: {"name":"{nodename}","data":"{node_log}"}
""" """
# We always grab 200 to match the follow call, but only _show_ `lines` number # We always grab 200 to match the follow call, but only _show_ `lines` number
params = {"lines": 200} max_lines = 200
params = {"lines": max_lines}
response = call_api( response = call_api(
config, "get", "/node/{node}/log".format(node=node), params=params config, "get", "/node/{node}/log".format(node=node), params=params
) )
@ -117,43 +121,50 @@ def follow_node_log(config, node, lines=10):
# Shrink the log buffer to length lines # Shrink the log buffer to length lines
node_log = response.json()["data"] node_log = response.json()["data"]
shrunk_log = node_log.split("\n")[-int(lines) :] full_log = node_log.split("\n")
loglines = "\n".join(shrunk_log) shrunk_log = full_log[-int(lines) :]
# Print the initial data and begin following # Print the initial data and begin following
print(loglines, end="") for line in shrunk_log:
print("\n", end="") print(line)
# Create the deque we'll use to buffer loglines
loglines = deque(full_log, max_lines)
while True: while True:
# Wait half a second
time.sleep(0.5)
# Grab the next line set (200 is a reasonable number of lines per half-second; any more are skipped) # Grab the next line set (200 is a reasonable number of lines per half-second; any more are skipped)
try: try:
params = {"lines": 200} params = {"lines": max_lines}
response = call_api( response = call_api(
config, "get", "/node/{node}/log".format(node=node), params=params config, "get", "/node/{node}/log".format(node=node), params=params
) )
new_node_log = response.json()["data"] new_node_log = response.json()["data"]
except Exception: except Exception:
break break
# Split the new and old log strings into constitutent lines # Split the new and old log strings into constitutent lines
old_node_loglines = node_log.split("\n")
new_node_loglines = new_node_log.split("\n") new_node_loglines = new_node_log.split("\n")
# Set the node log to the new log value for the next iteration # Find where in the new lines the last entryin the ongoing deque is
node_log = new_node_log start_idx = 0
for idx, line in enumerate(new_node_loglines):
if line == loglines[-1]:
start_idx = idx
# Get the difference between the two sets of lines # Get the new lines starting from the found index plus one
old_node_loglines_set = set(old_node_loglines) diff_node_loglines = new_node_loglines[start_idx + 1 :]
diff_node_loglines = [
x for x in new_node_loglines if x not in old_node_loglines_set
]
# If there's a difference, print it out # If there's a difference, add the lines to the ongling deque and print then out
if len(diff_node_loglines) > 0: if len(diff_node_loglines) > 0:
print("\n".join(diff_node_loglines), end="") for line in diff_node_loglines:
print("\n", end="") loglines.append(line)
print(line)
# Wait half a second del new_node_loglines
time.sleep(0.5) del diff_node_loglines
return True, "" return True, ""

View File

@ -25,9 +25,6 @@ import psycopg2.extras
from datetime import datetime from datetime import datetime
from json import loads, dumps from json import loads, dumps
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, update, finish from daemon_lib.celery import start, fail, log_info, update, finish
import daemon_lib.common as pvc_common import daemon_lib.common as pvc_common
@ -135,11 +132,11 @@ def cleanup(job_name, db_conn=None, db_cur=None, zkhandler=None):
# Database connections # Database connections
def open_database(config): def open_database(config):
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_dbname"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cur return conn, cur
@ -152,7 +149,7 @@ def close_database(conn, cur, failed=False):
conn.close() conn.close()
def list_benchmarks(job=None): def list_benchmarks(config, job=None):
if job is not None: if job is not None:
query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks") query = "SELECT * FROM {} WHERE job = %s;".format("storage_benchmarks")
args = (job,) args = (job,)
@ -278,17 +275,8 @@ def run_benchmark_job(
return jstdout return jstdout
def run_benchmark(self, pool): def worker_run_benchmark(zkhandler, celery, config, pool):
# Phase 0 - connect to databases # Phase 0 - connect to databases
try:
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
fail(
self,
"Failed to connect to Zookeeper",
)
cur_time = datetime.now().isoformat(timespec="seconds") cur_time = datetime.now().isoformat(timespec="seconds")
cur_primary = zkhandler.read("base.config.primary_node") cur_primary = zkhandler.read("base.config.primary_node")
job_name = f"{cur_time}_{cur_primary}" job_name = f"{cur_time}_{cur_primary}"
@ -296,7 +284,7 @@ def run_benchmark(self, pool):
current_stage = 0 current_stage = 0
total_stages = 13 total_stages = 13
start( start(
self, celery,
f"Running storage benchmark '{job_name}' on pool '{pool}'", f"Running storage benchmark '{job_name}' on pool '{pool}'",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -312,13 +300,13 @@ def run_benchmark(self, pool):
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail( fail(
self, celery,
"Failed to connect to Postgres", "Failed to connect to Postgres",
) )
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Storing running status in database", "Storing running status in database",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -340,11 +328,11 @@ def run_benchmark(self, pool):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(self, f"Failed to store running status: {e}", exception=BenchmarkError) fail(celery, f"Failed to store running status: {e}", exception=BenchmarkError)
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Creating benchmark volume", "Creating benchmark volume",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -363,7 +351,7 @@ def run_benchmark(self, pool):
for test in test_matrix: for test in test_matrix:
current_stage += 1 current_stage += 1
update( update(
self, celery,
f"Running benchmark job '{test}'", f"Running benchmark job '{test}'",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -381,7 +369,7 @@ def run_benchmark(self, pool):
# Phase 3 - cleanup # Phase 3 - cleanup
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Cleaning up venchmark volume", "Cleaning up venchmark volume",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -397,7 +385,7 @@ def run_benchmark(self, pool):
current_stage += 1 current_stage += 1
update( update(
self, celery,
"Storing results in database", "Storing results in database",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,
@ -415,7 +403,7 @@ def run_benchmark(self, pool):
db_cur=db_cur, db_cur=db_cur,
zkhandler=zkhandler, zkhandler=zkhandler,
) )
fail(self, f"Failed to store test results: {e}", exception=BenchmarkError) fail(celery, f"Failed to store test results: {e}", exception=BenchmarkError)
cleanup( cleanup(
job_name, job_name,
@ -426,7 +414,7 @@ def run_benchmark(self, pool):
current_stage += 1 current_stage += 1
return finish( return finish(
self, celery,
f"Storage benchmark {job_name} completed successfully", f"Storage benchmark {job_name} completed successfully",
current=current_stage, current=current_stage,
total=total_stages, total=total_stages,

View File

@ -70,12 +70,30 @@ def get_static_data():
def get_configuration_path(): def get_configuration_path():
config_file = None
try: try:
return os.environ["PVCD_CONFIG_FILE"] _config_file = "/etc/pvc/pvcnoded.yaml"
except KeyError: if not os.path.exists(_config_file):
print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set.') raise
config_file = _config_file
config_type = "legacy"
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "current"
except Exception:
pass
if not config_file:
print('ERROR: The "PVC_CONFIG_FILE" environment variable must be set.')
os._exit(1) os._exit(1)
return config_file, config_type
def get_hostname(): def get_hostname():
node_fqdn = gethostname() node_fqdn = gethostname()
@ -119,12 +137,282 @@ def validate_floating_ip(config, network):
return True, "" return True, ""
def get_configuration(): def get_configuration_current(config_file):
""" print('Loading configuration from file "{}"'.format(config_file))
Parse the configuration of the node daemon.
"""
pvcnoded_config_file = get_configuration_path()
with open(config_file, "r") as cfgfh:
try:
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
except Exception as e:
print(f"ERROR: Failed to parse configuration file: {e}")
os._exit(1)
config = dict()
node_fqdn, node_hostname, node_domain, node_id = get_hostname()
config_thisnode = {
"node": node_hostname,
"node_hostname": node_hostname,
"node_fqdn": node_fqdn,
"node_domain": node_domain,
"node_id": node_id,
}
config = {**config, **config_thisnode}
try:
o_path = o_config["path"]
config_path = {
"plugin_directory": o_path.get(
"plugin_directory", "/usr/share/pvc/plugins"
),
"dynamic_directory": o_path["dynamic_directory"],
"log_directory": o_path["system_log_directory"],
"console_log_directory": o_path["console_log_directory"],
"ceph_directory": o_path["ceph_directory"],
}
# Define our dynamic directory schema
config_path["dnsmasq_dynamic_directory"] = (
config_path["dynamic_directory"] + "/dnsmasq"
)
config_path["pdns_dynamic_directory"] = (
config_path["dynamic_directory"] + "/pdns"
)
config_path["nft_dynamic_directory"] = config_path["dynamic_directory"] + "/nft"
# Define our log directory schema
config_path["dnsmasq_log_directory"] = config_path["log_directory"] + "/dnsmasq"
config_path["pdns_log_directory"] = config_path["log_directory"] + "/pdns"
config_path["nft_log_directory"] = config_path["log_directory"] + "/nft"
config = {**config, **config_path}
o_subsystem = o_config["subsystem"]
config_subsystem = {
"enable_hypervisor": o_subsystem.get("enable_hypervisor", True),
"enable_networking": o_subsystem.get("enable_networking", True),
"enable_storage": o_subsystem.get("enable_storage", True),
"enable_worker": o_subsystem.get("enable_worker", True),
"enable_api": o_subsystem.get("enable_api", True),
}
config = {**config, **config_subsystem}
o_cluster = o_config["cluster"]
config_cluster = {
"cluster_name": o_cluster["name"],
"all_nodes": o_cluster["all_nodes"],
"coordinators": o_cluster["coordinator_nodes"],
}
config = {**config, **config_cluster}
o_cluster_networks = o_cluster["networks"]
for network_type in ["cluster", "storage", "upstream"]:
o_cluster_networks_specific = o_cluster_networks[network_type]
config_cluster_networks_specific = {
f"{network_type}_domain": o_cluster_networks_specific["domain"],
f"{network_type}_dev": o_cluster_networks_specific["device"],
f"{network_type}_mtu": o_cluster_networks_specific["mtu"],
f"{network_type}_network": o_cluster_networks_specific["ipv4"][
"network_address"
]
+ "/"
+ str(o_cluster_networks_specific["ipv4"]["netmask"]),
f"{network_type}_floating_ip": o_cluster_networks_specific["ipv4"][
"floating_address"
]
+ "/"
+ str(o_cluster_networks_specific["ipv4"]["netmask"]),
f"{network_type}_node_ip_selection": o_cluster_networks_specific[
"node_ip_selection"
],
}
if (
o_cluster_networks_specific["ipv4"].get("gateway_address", None)
is not None
):
config[f"{network_type}_gateway"] = o_cluster_networks_specific["ipv4"][
"gateway_address"
]
result, msg = validate_floating_ip(
config_cluster_networks_specific, network_type
)
if not result:
raise MalformedConfigurationError(msg)
network = ip_network(
config_cluster_networks_specific[f"{network_type}_network"]
)
if (
config_cluster_networks_specific[f"{network_type}_node_ip_selection"]
== "by-id"
):
address_id = int(node_id) - 1
else:
# This roundabout solution ensures the given IP is in the subnet and is something valid
address_id = [
idx
for idx, ip in enumerate(list(network.hosts()))
if str(ip)
== config_cluster_networks_specific[
f"{network_type}_node_ip_selection"
]
][0]
config_cluster_networks_specific[
f"{network_type}_dev_ip"
] = f"{list(network.hosts())[address_id]}/{network.prefixlen}"
config = {**config, **config_cluster_networks_specific}
o_database = o_config["database"]
config_database = {
"zookeeper_port": o_database["zookeeper"]["port"],
"keydb_port": o_database["keydb"]["port"],
"keydb_host": o_database["keydb"]["hostname"],
"keydb_path": o_database["keydb"]["path"],
"api_postgresql_port": o_database["postgres"]["port"],
"api_postgresql_host": o_database["postgres"]["hostname"],
"api_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"database"
],
"api_postgresql_user": o_database["postgres"]["credentials"]["api"][
"username"
],
"api_postgresql_password": o_database["postgres"]["credentials"]["api"][
"password"
],
"pdns_postgresql_port": o_database["postgres"]["port"],
"pdns_postgresql_host": o_database["postgres"]["hostname"],
"pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][
"database"
],
"pdns_postgresql_user": o_database["postgres"]["credentials"]["dns"][
"username"
],
"pdns_postgresql_password": o_database["postgres"]["credentials"]["dns"][
"password"
],
}
config = {**config, **config_database}
o_timer = o_config["timer"]
config_timer = {
"vm_shutdown_timeout": int(o_timer.get("vm_shutdown_timeout", 180)),
"keepalive_interval": int(o_timer.get("keepalive_interval", 5)),
"monitoring_interval": int(o_timer.get("monitoring_interval", 60)),
}
config = {**config, **config_timer}
o_fencing = o_config["fencing"]
config_fencing = {
"disable_on_ipmi_failure": o_fencing["disable_on_ipmi_failure"],
"fence_intervals": int(o_fencing["intervals"].get("fence_intervals", 6)),
"suicide_intervals": int(o_fencing["intervals"].get("suicide_interval", 0)),
"successful_fence": o_fencing["actions"].get("successful_fence", None),
"failed_fence": o_fencing["actions"].get("failed_fence", None),
"ipmi_hostname": o_fencing["ipmi"]["hostname"].format(node_id=node_id),
"ipmi_username": o_fencing["ipmi"]["username"],
"ipmi_password": o_fencing["ipmi"]["password"],
}
config = {**config, **config_fencing}
o_migration = o_config["migration"]
config_migration = {
"migration_target_selector": o_migration.get("target_selector", "mem"),
}
config = {**config, **config_migration}
o_logging = o_config["logging"]
config_logging = {
"debug": o_logging.get("debug_logging", False),
"file_logging": o_logging.get("file_logging", False),
"stdout_logging": o_logging.get("stdout_logging", False),
"zookeeper_logging": o_logging.get("zookeeper_logging", False),
"log_colours": o_logging.get("log_colours", False),
"log_dates": o_logging.get("log_dates", False),
"log_keepalives": o_logging.get("log_keepalives", False),
"log_keepalive_cluster_details": o_logging.get(
"log_cluster_details", False
),
"log_monitoring_details": o_logging.get("log_monitoring_details", False),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
config = {**config, **config_logging}
o_guest_networking = o_config["guest_networking"]
config_guest_networking = {
"bridge_dev": o_guest_networking["bridge_device"],
"bridge_mtu": o_guest_networking["bridge_mtu"],
"enable_sriov": o_guest_networking.get("sriov_enable", False),
"sriov_device": o_guest_networking.get("sriov_device", list()),
}
config = {**config, **config_guest_networking}
o_ceph = o_config["ceph"]
config_ceph = {
"ceph_config_file": config["ceph_directory"]
+ "/"
+ o_ceph["ceph_config_file"],
"ceph_admin_keyring": config["ceph_directory"]
+ "/"
+ o_ceph["ceph_keyring_file"],
"ceph_monitor_port": o_ceph["monitor_port"],
"ceph_secret_uuid": o_ceph["secret_uuid"],
"storage_hosts": o_ceph.get("monitor_hosts", None),
}
config = {**config, **config_ceph}
o_api = o_config["api"]
o_api_listen = o_api["listen"]
config_api_listen = {
"api_listen_address": o_api_listen["address"],
"api_listen_port": o_api_listen["port"],
}
config = {**config, **config_api_listen}
o_api_authentication = o_api["authentication"]
config_api_authentication = {
"api_auth_enabled": o_api_authentication.get("enabled", False),
"api_auth_secret_key": o_api_authentication.get("secret_key", ""),
"api_auth_source": o_api_authentication.get("source", "token"),
}
config = {**config, **config_api_authentication}
o_api_ssl = o_api["ssl"]
config_api_ssl = {
"api_ssl_enabled": o_api_ssl.get("enabled", False),
"api_ssl_cert_file": o_api_ssl.get("certificate", None),
"api_ssl_key_file": o_api_ssl.get("private_key", None),
}
config = {**config, **config_api_ssl}
# Use coordinators as storage hosts if not explicitly specified
if not config["storage_hosts"] or len(config["storage_hosts"]) < 1:
config["storage_hosts"] = config["coordinators"]
# Set up our token list if specified
if config["api_auth_source"] == "token":
config["api_auth_tokens"] = o_api["token"]
else:
if config["api_auth_enabled"]:
print(
"WARNING: No authentication method provided; disabling API authentication."
)
config["api_auth_enabled"] = False
# Add our node static data to the config
config["static_data"] = get_static_data()
except Exception as e:
raise MalformedConfigurationError(e)
return config
def get_configuration_legacy(pvcnoded_config_file):
print('Loading configuration from file "{}"'.format(pvcnoded_config_file)) print('Loading configuration from file "{}"'.format(pvcnoded_config_file))
with open(pvcnoded_config_file, "r") as cfgfile: with open(pvcnoded_config_file, "r") as cfgfile:
@ -168,6 +456,7 @@ def get_configuration():
"enable_hypervisor": o_functions.get("enable_hypervisor", False), "enable_hypervisor": o_functions.get("enable_hypervisor", False),
"enable_networking": o_functions.get("enable_networking", False), "enable_networking": o_functions.get("enable_networking", False),
"enable_storage": o_functions.get("enable_storage", False), "enable_storage": o_functions.get("enable_storage", False),
"enable_worker": o_functions.get("enable_worker", True),
"enable_api": o_functions.get("enable_api", False), "enable_api": o_functions.get("enable_api", False),
} }
@ -417,6 +706,20 @@ def get_configuration():
return config return config
def get_configuration():
"""
Parse the configuration of the node daemon.
"""
pvc_config_file, pvc_config_type = get_configuration_path()
if pvc_config_type == "legacy":
config = get_configuration_legacy(pvc_config_file)
else:
config = get_configuration_current(pvc_config_file)
return config
def validate_directories(config): def validate_directories(config):
if not os.path.exists(config["dynamic_directory"]): if not os.path.exists(config["dynamic_directory"]):
os.makedirs(config["dynamic_directory"]) os.makedirs(config["dynamic_directory"])

View File

@ -76,9 +76,11 @@ class Logger(object):
self.config = config self.config = config
if self.config["file_logging"]: if self.config["file_logging"]:
self.logfile = self.config["log_directory"] + "/pvc.log" self.logfile = (
self.config["log_directory"] + "/" + self.config["daemon_name"] + ".log"
)
# We open the logfile for the duration of our session, but have a hup function # We open the logfile for the duration of our session, but have a hup function
self.writer = open(self.logfile, "a", buffering=0) self.writer = open(self.logfile, "a")
self.last_colour = "" self.last_colour = ""
self.last_prompt = "" self.last_prompt = ""
@ -91,14 +93,12 @@ class Logger(object):
# Provide a hup function to close and reopen the writer # Provide a hup function to close and reopen the writer
def hup(self): def hup(self):
self.writer.close() self.writer.close()
self.writer = open(self.logfile, "a", buffering=0) self.writer = open(self.logfile, "a")
# Provide a termination function so all messages are flushed before terminating the main daemon # Provide a termination function so all messages are flushed before terminating the main daemon
def terminate(self): def terminate(self):
if self.config["file_logging"]:
self.writer.close()
if self.config["zookeeper_logging"]: if self.config["zookeeper_logging"]:
self.out("Waiting 15s for Zookeeper message queue to drain", state="s") self.out("Waiting for Zookeeper message queue to drain", state="s")
tick_count = 0 tick_count = 0
while not self.zookeeper_queue.empty(): while not self.zookeeper_queue.empty():
@ -110,6 +110,9 @@ class Logger(object):
self.zookeeper_logger.stop() self.zookeeper_logger.stop()
self.zookeeper_logger.join() self.zookeeper_logger.join()
if self.config["file_logging"]:
self.writer.close()
# Output function # Output function
def out(self, message, state=None, prefix=""): def out(self, message, state=None, prefix=""):
# Get the date # Get the date
@ -139,20 +142,28 @@ class Logger(object):
if prefix != "": if prefix != "":
prefix = prefix + " - " prefix = prefix + " - "
# Assemble message string
message = colour + prompt + endc + date + prefix + message
# Log to stdout # Log to stdout
if self.config["stdout_logging"]: if self.config["stdout_logging"]:
print(message) # Assemble output string
output = colour + prompt + endc + date + prefix + message
print(output)
# Log to file # Log to file
if self.config["file_logging"]: if self.config["file_logging"]:
self.writer.write(message + "\n") # Assemble output string
output = colour + prompt + endc + date + prefix + message
self.writer.write(output + "\n")
self.writer.flush()
# Log to Zookeeper # Log to Zookeeper
if self.config["zookeeper_logging"]: if self.config["zookeeper_logging"]:
self.zookeeper_queue.put(message) # Set the daemon value (only used here as others do not overlap with different daemons)
daemon = f"{self.config['daemon_name']}: "
# Expand to match all daemon names (pvcnoded, pvcworkerd, pvchealthd)
daemon = "{daemon: <12}".format(daemon=daemon)
# Assemble output string
output = daemon + colour + prompt + endc + date + prefix + message
self.zookeeper_queue.put(output)
# Set last message variables # Set last message variables
self.last_colour = colour self.last_colour = colour
@ -198,16 +209,9 @@ class ZookeeperLogger(Thread):
self.zkhandler.write([("base.logs", ""), (("logs", self.node), "")]) self.zkhandler.write([("base.logs", ""), (("logs", self.node), "")])
def run(self): def run(self):
while not self.connected: self.start_zkhandler()
self.start_zkhandler()
sleep(1)
self.running = True self.running = True
# Get the logs that are currently in Zookeeper and populate our deque
raw_logs = self.zkhandler.read(("logs.messages", self.node))
if raw_logs is None:
raw_logs = ""
logs = deque(raw_logs.split("\n"), self.max_lines)
while self.running: while self.running:
# Get a new message # Get a new message
try: try:
@ -222,25 +226,26 @@ class ZookeeperLogger(Thread):
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")) date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
else: else:
date = "" date = ""
# Add the message to the deque
logs.append(f"{date}{message}")
tick_count = 0 try:
while True: with self.zkhandler.writelock(("logs.messages", self.node)):
try: # Get the logs that are currently in Zookeeper and populate our deque
cur_logs = self.zkhandler.read(("logs.messages", self.node))
if cur_logs is None:
cur_logs = ""
logs = deque(cur_logs.split("\n"), self.max_lines - 1)
# Add the message to the deque
logs.append(f"{date}{message}")
# Write the updated messages into Zookeeper # Write the updated messages into Zookeeper
self.zkhandler.write( self.zkhandler.write(
[(("logs.messages", self.node), "\n".join(logs))] [(("logs.messages", self.node), "\n".join(logs))]
) )
break except Exception:
except Exception: continue
# The write failed (connection loss, etc.) so retry for 15 seconds
sleep(0.5)
tick_count += 1
if tick_count > 30:
break
else:
continue
return return
def stop(self): def stop(self):

View File

@ -31,8 +31,6 @@ import uuid
from contextlib import contextmanager from contextlib import contextmanager
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
@ -167,11 +165,11 @@ def chroot(destination):
def open_db(config): def open_db(config):
try: try:
conn = psycopg2.connect( conn = psycopg2.connect(
host=config["database_host"], host=config["api_postgresql_host"],
port=config["database_port"], port=config["api_postgresql_port"],
dbname=config["database_name"], dbname=config["api_postgresql_name"],
user=config["database_user"], user=config["api_postgresql_user"],
password=config["database_password"], password=config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception: except Exception:
@ -216,8 +214,14 @@ def open_zk(config):
# #
# Main VM provisioning function - executed by the Celery worker # Main VM provisioning function - executed by the Celery worker
# #
def create_vm( def worker_create_vm(
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[] celery,
config,
vm_name,
vm_profile,
define_vm=True,
start_vm=True,
script_run_args=[],
): ):
current_stage = 0 current_stage = 0
total_stages = 11 total_stages = 11
@ -326,9 +330,9 @@ def create_vm(
vm_data["system_architecture"] = stdout.strip() vm_data["system_architecture"] = stdout.strip()
monitor_list = list() monitor_list = list()
coordinator_names = config["storage_hosts"] monitor_names = config["storage_hosts"]
for coordinator in coordinator_names: for monitor in monitor_names:
monitor_list.append("{}.{}".format(coordinator, config["storage_domain"])) monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
vm_data["ceph_monitor_list"] = monitor_list vm_data["ceph_monitor_list"] = monitor_list
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"] vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"] vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]

30
debian/control vendored
View File

@ -4,20 +4,36 @@ Priority: optional
Maintainer: Joshua Boniface <joshua@boniface.me> Maintainer: Joshua Boniface <joshua@boniface.me>
Standards-Version: 3.9.8 Standards-Version: 3.9.8
Homepage: https://www.boniface.me Homepage: https://www.boniface.me
X-Python3-Version: >= 3.2 X-Python3-Version: >= 3.7
Package: pvc-daemon-node Package: pvc-daemon-node
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql Depends: systemd, pvc-daemon-common, pvc-daemon-health, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Description: Parallel Virtual Cluster node daemon (Python 3) Description: Parallel Virtual Cluster node daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .
This package installs the PVC node daemon This package installs the PVC node daemon
Package: pvc-daemon-health
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
Description: Parallel Virtual Cluster health daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC health monitoring daemon
Package: pvc-daemon-worker
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio
Description: Parallel Virtual Cluster worker daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC Celery task worker daemon
Package: pvc-daemon-api Package: pvc-daemon-api
Architecture: all Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Description: Parallel Virtual Cluster API daemon (Python 3) Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .
This package installs the PVC API daemon This package installs the PVC API daemon
@ -25,7 +41,7 @@ Description: Parallel Virtual Cluster API daemon (Python 3)
Package: pvc-daemon-common Package: pvc-daemon-common
Architecture: all Architecture: all
Depends: python3-kazoo, python3-psutil, python3-click, python3-lxml Depends: python3-kazoo, python3-psutil, python3-click, python3-lxml
Description: Parallel Virtual Cluster common libraries (Python 3) Description: Parallel Virtual Cluster common libraries
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .
This package installs the common libraries for the daemon and API This package installs the common libraries for the daemon and API
@ -33,7 +49,7 @@ Description: Parallel Virtual Cluster common libraries (Python 3)
Package: pvc-client-cli Package: pvc-client-cli
Architecture: all Architecture: all
Depends: python3-requests, python3-requests-toolbelt, python3-yaml, python3-lxml, python3-click Depends: python3-requests, python3-requests-toolbelt, python3-yaml, python3-lxml, python3-click
Description: Parallel Virtual Cluster CLI client (Python 3) Description: Parallel Virtual Cluster CLI client
A KVM/Zookeeper/Ceph-based VM and private cloud manager A KVM/Zookeeper/Ceph-based VM and private cloud manager
. .
This package installs the PVC API command-line client This package installs the PVC API command-line client

View File

@ -1 +0,0 @@
client-cli/autobackup.sample.yaml usr/share/pvc

View File

@ -1,10 +1,7 @@
api-daemon/pvcapid.py usr/share/pvc api-daemon/pvcapid.py usr/share/pvc
api-daemon/pvcapid-manage*.py usr/share/pvc api-daemon/pvcapid-manage*.py usr/share/pvc
api-daemon/pvc-api-db-upgrade usr/share/pvc api-daemon/pvc-api-db-upgrade usr/share/pvc
api-daemon/pvcapid.sample.yaml usr/share/pvc
api-daemon/pvcapid usr/share/pvc api-daemon/pvcapid usr/share/pvc
api-daemon/pvcapid.service lib/systemd/system api-daemon/pvcapid.service lib/systemd/system
api-daemon/pvcworkerd.service lib/systemd/system
api-daemon/pvcworkerd.sh usr/share/pvc
api-daemon/provisioner usr/share/pvc api-daemon/provisioner usr/share/pvc
api-daemon/migrations usr/share/pvc api-daemon/migrations usr/share/pvc

View File

@ -15,9 +15,6 @@ if systemctl is-active --quiet pvcworkerd.service; then
systemctl start pvcworkerd.service systemctl start pvcworkerd.service
fi fi
if [ ! -f /etc/pvc/pvcapid.yaml ]; then if [ ! -f /etc/pvc/pvc.conf ]; then
echo "NOTE: The PVC client API daemon (pvcapid.service) and the PVC Worker daemon (pvcworkerd.service) have not been started; create a config file at /etc/pvc/pvcapid.yaml, then run the database configuration (/usr/share/pvc/pvc-api-db-upgrade) and start them manually." echo "NOTE: The PVC client API daemon (pvcapid.service) and the PVC Worker daemon (pvcworkerd.service) have not been started; create a config file at /etc/pvc/pvc.conf, then run the database configuration (/usr/share/pvc/pvc-api-db-upgrade) and start them manually."
fi fi
# Clean up any old sample configs
rm /etc/pvc/pvcapid.sample.yaml || true

5
debian/pvc-daemon-api.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.install vendored Normal file
View File

@ -0,0 +1,4 @@
health-daemon/pvchealthd.py usr/share/pvc
health-daemon/pvchealthd usr/share/pvc
health-daemon/pvchealthd.service lib/systemd/system
health-daemon/plugins usr/share/pvc

14
debian/pvc-daemon-health.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvchealthd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvchealthd.service; then
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

6
debian/pvc-daemon-health.preinst vendored Normal file
View File

@ -0,0 +1,6 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/plugins -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvchealthd.service

View File

@ -1,8 +1,6 @@
node-daemon/pvcnoded.py usr/share/pvc node-daemon/pvcnoded.py usr/share/pvc
node-daemon/pvcnoded.sample.yaml usr/share/pvc
node-daemon/pvcnoded usr/share/pvc node-daemon/pvcnoded usr/share/pvc
node-daemon/pvcnoded.service lib/systemd/system node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system node-daemon/pvc.target lib/systemd/system
node-daemon/pvcautoready.service lib/systemd/system node-daemon/pvcautoready.service lib/systemd/system
node-daemon/monitoring usr/share/pvc node-daemon/monitoring usr/share/pvc
node-daemon/plugins usr/share/pvc

View File

@ -12,8 +12,5 @@ systemctl enable /lib/systemd/system/pvc.target
if systemctl is-active --quiet pvcnoded.service; then if systemctl is-active --quiet pvcnoded.service; then
echo "NOTE: The PVC node daemon (pvcnoded.service) has not been restarted; this is up to the administrator." echo "NOTE: The PVC node daemon (pvcnoded.service) has not been restarted; this is up to the administrator."
else else
echo "NOTE: The PVC node daemon (pvcnoded.service) has not been started; create a config file at /etc/pvc/pvcnoded.yaml then start it." echo "NOTE: The PVC node daemon (pvcnoded.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi fi
# Clean up any old sample configs
rm /etc/pvc/pvcnoded.sample.yaml || true

View File

@ -2,4 +2,4 @@
# Remove any cached CPython directories or files # Remove any cached CPython directories or files
echo "Cleaning up existing CPython files" echo "Cleaning up existing CPython files"
find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.install vendored Normal file
View File

@ -0,0 +1,4 @@
worker-daemon/pvcworkerd.sh usr/share/pvc
worker-daemon/pvcworkerd.py usr/share/pvc
worker-daemon/pvcworkerd usr/share/pvc
worker-daemon/pvcworkerd.service lib/systemd/system

14
debian/pvc-daemon-worker.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvcworkerd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvcworkerd.service; then
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC worker daemon (pvcworkerd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

5
debian/pvc-daemon-worker.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcworkerd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-worker.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvcworkerd.service

View File

@ -8,7 +8,7 @@ import os
import sys import sys
import json import json
os.environ['PVC_CONFIG_FILE'] = "./api-daemon/pvcapid.sample.yaml" os.environ['PVC_CONFIG_FILE'] = "./pvc.sample.conf"
sys.path.append('api-daemon') sys.path.append('api-daemon')

1
health-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -40,7 +40,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to
@ -75,7 +75,7 @@ class MonitoringPluginScript(MonitoringPlugin):
# Set the health delta to 0 (no change) # Set the health delta to 0 (no change)
health_delta = 0 health_delta = 0
# Craft a message that can be used by the clients # Craft a message that can be used by the clients
message = "Successfully connected to Libvirtd on localhost" message = "Successfully connected to KeyDB/Redis on localhost"
# Check the Zookeeper connection # Check the Zookeeper connection
try: try:

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the # This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class # MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to # A monitoring plugin script must always expose its nice name, which must be identical to

24
health-daemon/pvchealthd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvchealthd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.Daemon # noqa: F401
pvchealthd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster health daemon unit file
[Unit]
Description = Parallel Virtual Cluster health daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvchealthd.py
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Daemon.py - Health daemon main entrypoing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.util.zookeeper
import pvchealthd.objects.MonitoringInstance as MonitoringInstance
import pvchealthd.objects.NodeInstance as NodeInstance
import daemon_lib.config as cfg
import daemon_lib.log as log
from time import sleep
import os
import signal
# Daemon version
version = "0.9.82"
##########################################################
# Entrypoint
##########################################################
def entrypoint():
monitoring_instance = None
# Get our configuration
config = cfg.get_configuration()
config["daemon_name"] = "pvchealthd"
config["daemon_version"] = version
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|--------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster health daemon v{0: <20} |".format(version))
logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|--------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s")
# Connect to Zookeeper and return our handler and current schema version
zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config)
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, monitoring_instance
logger.out("Terminating pvchealthd and cleaning up", state="s")
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Close the Zookeeper connection
try:
zkhandler.disconnect(persistent=True)
del zkhandler
except Exception:
pass
logger.out("Terminated health daemon", state="s")
logger.terminate()
if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function
def term(signum="", frame=""):
cleanup(failure=False)
# Hangup (logrotate) function
def hup(signum="", frame=""):
if config["file_logging"]:
logger.hup()
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGHUP, hup)
this_node = NodeInstance.NodeInstance(
config["node_hostname"],
zkhandler,
config,
logger,
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Tick loop; does nothing since everything is async
while True:
try:
sleep(1)
except Exception:
break

View File

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# PluginInstance.py - Class implementing a PVC monitoring instance # MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system # Part of the Parallel Virtual Cluster (PVC) system
# #
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me> # Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
@ -317,8 +317,17 @@ class MonitoringInstance(object):
) )
if successful_plugins < 1: if successful_plugins < 1:
self.logger.out(
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
state="e",
)
return return
self.logger.out(
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
state="s",
)
# Clean up any old plugin data for which a plugin file no longer exists # Clean up any old plugin data for which a plugin file no longer exists
plugins_data = self.zkhandler.children( plugins_data = self.zkhandler.children(
("node.monitoring.data", self.this_node.name) ("node.monitoring.data", self.this_node.name)
@ -344,6 +353,7 @@ class MonitoringInstance(object):
def shutdown(self): def shutdown(self):
self.stop_check_timer() self.stop_check_timer()
self.run_cleanups() self.run_cleanups()
return
def start_check_timer(self): def start_check_timer(self):
check_interval = self.config["monitoring_interval"] check_interval = self.config["monitoring_interval"]
@ -395,6 +405,11 @@ class MonitoringInstance(object):
active_coordinator_state = self.this_node.coordinator_state active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now() runtime_start = datetime.now()
self.logger.out(
"Starting monitoring healthcheck run",
state="t",
)
total_health = 100 total_health = 100
plugin_results = list() plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
@ -406,10 +421,7 @@ class MonitoringInstance(object):
plugin_results.append(future.result()) plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name): for result in sorted(plugin_results, key=lambda x: x.plugin_name):
if ( if self.config["log_monitoring_details"]:
self.config["log_keepalives"]
and self.config["log_keepalive_plugin_details"]
):
self.logger.out( self.logger.out(
result.message + f" [-{result.health_delta}]", result.message + f" [-{result.health_delta}]",
state="t", state="t",

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
class NodeInstance(object):
# Initialization function
def __init__(
self,
name,
zkhandler,
config,
logger,
):
# Passed-in variables on creation
self.name = name
self.zkhandler = zkhandler
self.config = config
self.logger = logger
# States
self.daemon_state = "stop"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Node resources
self.health = 100
self.active_domains_count = 0
self.provisioned_domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.daemon", self.name)
)
def watch_node_daemonstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "stop"
if data != self.daemon_state:
self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.router", self.name)
)
def watch_node_routerstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "client"
if data != self.coordinator_state:
self.coordinator_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.domain", self.name)
)
def watch_node_domainstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "unknown"
if data != self.domain_state:
self.domain_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.monitoring.health", self.name)
)
def watch_node_health(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 100
try:
data = int(data)
except ValueError:
pass
if data != self.health:
self.health = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.free", self.name)
)
def watch_node_memfree(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.used", self.name)
)
def watch_node_memused(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.allocated", self.name)
)
def watch_node_memalloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.vcpu.allocated", self.name)
)
def watch_node_vcpualloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.running_domains", self.name)
)
def watch_node_runningdomains(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii").split()
except AttributeError:
data = []
if len(data) != self.active_domains_count:
self.active_domains_count = len(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.count.provisioned_domains", self.name)
)
def watch_node_domainscount(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.provisioned_domains_count:
self.provisioned_domains_count = data

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
# <Filename> - <Description>
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
##############################################################################
from daemon_lib.zkhandler import ZKHandler
import os
import time
def connect(logger, config):
# Create an instance of the handler
zkhandler = ZKHandler(config, logger)
try:
logger.out(
"Connecting to Zookeeper on coordinator nodes {}".format(
config["coordinators"]
),
state="i",
)
# Start connection
zkhandler.connect(persistent=True)
except Exception as e:
logger.out(
"ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e"
)
os._exit(1)
logger.out("Validating Zookeeper schema", state="i")
try:
node_schema_version = int(
zkhandler.read(("node.data.active_schema", config["node_hostname"]))
)
except Exception:
node_schema_version = int(zkhandler.read("base.schema.version"))
zkhandler.write(
[
(
("node.data.active_schema", config["node_hostname"]),
node_schema_version,
)
]
)
# Load in the current node schema version
zkhandler.schema.load(node_schema_version)
# Record the latest intalled schema version
latest_schema_version = zkhandler.schema.find_latest()
logger.out("Latest installed schema is {}".format(latest_schema_version), state="i")
zkhandler.write(
[(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)]
)
# If we are the last node to get a schema update, fire the master update
if latest_schema_version > node_schema_version:
node_latest_schema_version = list()
for node in zkhandler.children("base.node"):
node_latest_schema_version.append(
int(zkhandler.read(("node.data.latest_schema", node)))
)
# This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load.
if node_latest_schema_version.count(latest_schema_version) == len(
node_latest_schema_version
):
zkhandler.write([("base.schema.version", latest_schema_version)])
return zkhandler, node_schema_version
def validate_schema(logger, zkhandler):
# Validate our schema against the active version
if not zkhandler.schema.validate(zkhandler, logger):
logger.out("Found schema violations, applying", state="i")
zkhandler.schema.apply(zkhandler)
else:
logger.out("Schema successfully validated", state="o")
def setup_node(logger, config, zkhandler):
# Check if our node exists in Zookeeper, and create it if not
if config["daemon_mode"] == "coordinator":
init_routerstate = "secondary"
else:
init_routerstate = "client"
if zkhandler.exists(("node", config["node_hostname"])):
logger.out(
f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i"
)
# Update static data just in case it's changed
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
]
)
else:
logger.out(
f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node",
state="i",
)
keepalive_time = int(time.time())
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.keepalive", config["node_hostname"]), str(keepalive_time)),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.domain", config["node_hostname"]), "flushed"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
(("node.memory.total", config["node_hostname"]), "0"),
(("node.memory.used", config["node_hostname"]), "0"),
(("node.memory.free", config["node_hostname"]), "0"),
(("node.memory.allocated", config["node_hostname"]), "0"),
(("node.memory.provisioned", config["node_hostname"]), "0"),
(("node.vcpu.allocated", config["node_hostname"]), "0"),
(("node.cpu.load", config["node_hostname"]), "0.0"),
(("node.running_domains", config["node_hostname"]), "0"),
(("node.count.provisioned_domains", config["node_hostname"]), "0"),
(("node.count.networks", config["node_hostname"]), "0"),
]
)

View File

@ -1,216 +0,0 @@
---
# pvcnoded configuration file example
#
# This configuration file specifies details for this node in PVC. Multiple node
# blocks can be added but only the one matching the current system nodename will
# be used by the local daemon. Default values are not supported; the values in
# this sample configuration are considered defaults and, with adjustment of the
# nodename section and coordinators list, can be used as-is on a Debian system.
#
# Copy this example to /etc/pvc/pvcnoded.conf and edit to your needs
pvc:
# node: The (short) hostname of the node, set during provisioning
node: pvchv1
# debug: Enable or disable debug output
debug: False
# functions: The daemon functions to enable
functions:
# enable_hypervisor: Enable or disable hypervisor functionality
# This should never be False except in very advanced usecases
enable_hypervisor: True
# enable_networking: Enable or disable virtual networking and routing functionality
enable_networking: True
# enable_storage: Enable or disable Ceph storage management functionality
enable_storage: True
# enable_api: Enable or disable the API client, if installed, when node is Primary
enable_api: True
# cluster: Cluster-level configuration
cluster:
# coordinators: The list of cluster coordinator hostnames
coordinators:
- pvchv1
- pvchv2
- pvchv3
# networks: Cluster-level network configuration
# OPTIONAL if enable_networking: False
networks:
# upstream: Upstream routed network for in- and out-bound upstream networking
upstream:
# domain: Upstream domain name, may be None
domain: "mydomain.net"
# network: Upstream network block
network: "1.1.1.0/24"
# floating_ip: Upstream floating IP address for the primary coordinator
floating_ip: "1.1.1.10/24"
# gateway: Upstream static default gateway, if applicable
gateway: "1.1.1.1"
# cluster: Cluster internal network for node communication and client virtual networks
cluster:
# domain: Cluster internal domain name
domain: "pvc.local"
# network: Cluster internal network block
network: "10.255.0.0/24"
# floating_ip: Cluster internal floating IP address for the primary coordinator
floating_ip: "10.255.0.254/24"
# storage: Cluster internal network for storage traffic
storage:
# domain: Cluster storage domain name
domain: "pvc.storage"
# network: Cluster storage network block
network: "10.254.0.0/24"
# floating_ip: Cluster storage floating IP address for the primary coordinator
floating_ip: "10.254.0.254/24"
# coordinator: Coordinator-specific configuration
# OPTIONAL if enable_networking: False
coordinator:
# dns: DNS aggregator subsystem
dns:
# database: Patroni PostgreSQL database configuration
database:
# host: PostgreSQL hostname, invariably 'localhost'
host: localhost
# port: PostgreSQL port, invariably 'localhost'
port: 5432
# name: PostgreSQL database name, invariably 'pvcdns'
name: pvcdns
# user: PostgreSQL username, invariable 'pvcdns'
user: pvcdns
# pass: PostgreSQL user password, randomly generated
pass: pvcdns
# metadata: Metadata API subsystem
metadata:
# database: Patroni PostgreSQL database configuration
database:
# host: PostgreSQL hostname, invariably 'localhost'
host: localhost
# port: PostgreSQL port, invariably 'localhost'
port: 5432
# name: PostgreSQL database name, invariably 'pvcapi'
name: pvcapi
# user: PostgreSQL username, invariable 'pvcapi'
user: pvcapi
# pass: PostgreSQL user password, randomly generated
pass: pvcapi
# system: Local PVC instance configuration
system:
# intervals: Intervals for keepalives and fencing
intervals:
# vm_shutdown_timeout: Number of seconds for a VM to 'shutdown' before being forced off
vm_shutdown_timeout: 180
# keepalive_interval: Number of seconds between keepalive/status updates
keepalive_interval: 5
# monitoring_interval: Number of seconds between monitoring check updates
monitoring_interval: 60
# fence_intervals: Number of keepalive_intervals to declare a node dead and fence it
fence_intervals: 6
# suicide_intervals: Numer of keepalive_intervals before a node considers itself dead and self-fences, 0 to disable
suicide_intervals: 0
# fencing: Node fencing configuration
fencing:
# actions: Actions to take after a fence trigger
actions:
# successful_fence: Action to take after successfully fencing a node, options: migrate, None
successful_fence: migrate
# failed_fence: Action to take after failing to fence a node, options: migrate, None
failed_fence: None
# ipmi: Local system IPMI options
ipmi:
# host: Hostname/IP of the local system's IPMI interface, must be reachable
host: pvchv1-lom
# user: Local system IPMI username
user: admin
# pass: Local system IPMI password
pass: Passw0rd
# migration: Migration option configuration
migration:
# target_selector: Criteria to select the ideal migration target, options: mem, memprov, load, vcpus, vms
target_selector: mem
# configuration: Local system configurations
configuration:
# directories: PVC system directories
directories:
# plugin_directory: Directory containing node monitoring plugins
plugin_directory: "/usr/share/pvc/plugins"
# dynamic_directory: Temporary in-memory directory for active configurations
dynamic_directory: "/run/pvc"
# log_directory: Logging directory
log_directory: "/var/log/pvc"
# console_log_directory: Libvirt console logging directory
console_log_directory: "/var/log/libvirt"
# logging: PVC logging configuration
logging:
# file_logging: Enable or disable logging to files under log_directory
file_logging: True
# stdout_logging: Enable or disable logging to stdout (i.e. journald)
stdout_logging: True
# zookeeper_logging: Enable ot disable logging to Zookeeper (for `pvc node log` functionality)
zookeeper_logging: True
# log_colours: Enable or disable ANSI colours in log output
log_colours: True
# log_dates: Enable or disable date strings in log output
log_dates: True
# log_keepalives: Enable or disable keepalive logging
log_keepalives: True
# log_keepalive_cluster_details: Enable or disable node status logging during keepalive
log_keepalive_cluster_details: True
# log_keepalive_plugin_details: Enable or disable node health plugin logging during keepalive
log_keepalive_plugin_details: True
# console_log_lines: Number of console log lines to store in Zookeeper per VM
console_log_lines: 1000
# node_log_lines: Number of node log lines to store in Zookeeper per node
node_log_lines: 2000
# networking: PVC networking configuration
# OPTIONAL if enable_networking: False
networking:
# bridge_device: Underlying device to use for bridged vLAN networks; usually the device of <cluster>
bridge_device: ens4
# bridge_mtu: The MTU of the underlying device used for bridged vLAN networks, and thus the maximum
# MTU of the overlying bridge devices.
bridge_mtu: 1500
# sriov_enable: Enable or disable (default if absent) SR-IOV network support
sriov_enable: False
# sriov_device: Underlying device(s) to use for SR-IOV networks; can be bridge_device or other NIC(s)
sriov_device:
# The physical device name
- phy: ens1f1
# The preferred MTU of the physical device; OPTIONAL - defaults to the interface default if unset
mtu: 9000
# The number of VFs to enable on this device
# NOTE: This defines the maximum number of VMs which can be provisioned on this physical device; VMs
# are allocated to these VFs manually by the administrator and thus all nodes should have the
# same number
# NOTE: This value cannot be changed at runtime on Intel(R) NICs; the node will need to be restarted
# if this value changes
vfcount: 8
# upstream: Upstream physical interface device
upstream:
# device: Upstream interface device name
device: ens4
# mtu: Upstream interface MTU; use 9000 for jumbo frames (requires switch support)
mtu: 1500
# address: Upstream interface IP address, options: by-id, <static>/<mask>
address: by-id
# cluster: Cluster (VNIC) physical interface device
cluster:
# device: Cluster (VNIC) interface device name
device: ens4
# mtu: Cluster (VNIC) interface MTU; use 9000 for jumbo frames (requires switch support)
mtu: 1500
# address: Cluster (VNIC) interface IP address, options: by-id, <static>/<mask>
address: by-id
# storage: Storage (Ceph OSD) physical interface device
storage:
# device: Storage (Ceph OSD) interface device name
device: ens4
# mtu: Storage (Ceph OSD) interface MTU; use 9000 for jumbo frames (requires switch support)
mtu: 1500
# address: Storage (Ceph OSD) interface IP address, options: by-id, <static>/<mask>
address: by-id
# storage; PVC storage configuration
# OPTIONAL if enable_storage: False
storage:
# ceph_config_file: The config file containing the Ceph cluster configuration
ceph_config_file: "/etc/ceph/ceph.conf"
# ceph_admin_keyring: The file containing the Ceph client admin keyring
ceph_admin_keyring: "/etc/ceph/ceph.client.admin.keyring"

View File

@ -10,7 +10,7 @@ PartOf = pvc.target
Type = simple Type = simple
WorkingDirectory = /usr/share/pvc WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true Environment = PYTHONUNBUFFERED=true
Environment = PVCD_CONFIG_FILE=/etc/pvc/pvcnoded.yaml Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2 ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvcnoded.py ExecStart = /usr/share/pvc/pvcnoded.py
ExecStopPost = /bin/sleep 2 ExecStopPost = /bin/sleep 2

View File

@ -20,14 +20,12 @@
############################################################################### ###############################################################################
import pvcnoded.util.keepalive import pvcnoded.util.keepalive
import pvcnoded.util.config
import pvcnoded.util.fencing import pvcnoded.util.fencing
import pvcnoded.util.networking import pvcnoded.util.networking
import pvcnoded.util.services import pvcnoded.util.services
import pvcnoded.util.libvirt import pvcnoded.util.libvirt
import pvcnoded.util.zookeeper import pvcnoded.util.zookeeper
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance import pvcnoded.objects.VMInstance as VMInstance
@ -36,6 +34,7 @@ import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
import pvcnoded.objects.CephInstance as CephInstance import pvcnoded.objects.CephInstance as CephInstance
import daemon_lib.config as cfg
import daemon_lib.log as log import daemon_lib.log as log
import daemon_lib.common as common import daemon_lib.common as common
@ -59,45 +58,40 @@ version = "0.9.82"
def entrypoint(): def entrypoint():
keepalive_timer = None keepalive_timer = None
monitoring_instance = None
# Get our configuration # Get our configuration
config = pvcnoded.util.config.get_configuration() config = cfg.get_configuration()
config["pvcnoded_version"] = version config["daemon_name"] = "pvcnoded"
config["daemon_version"] = version
# Set some useful booleans for later (fewer characters)
debug = config["debug"]
if debug:
print("DEBUG MODE ENABLED")
# Create and validate our directories # Create and validate our directories
pvcnoded.util.config.validate_directories(config) cfg.validate_directories(config)
# Set up the logger instance # Set up the logger instance
logger = log.Logger(config) logger = log.Logger(config)
# Print our startup message # Print our startup message
logger.out("") logger.out("")
logger.out("|----------------------------------------------------------|") logger.out("|--------------------------------------------------------------|")
logger.out("| |") logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |") logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |") logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |") logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |") logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |") logger.out("| |")
logger.out("|----------------------------------------------------------|") logger.out("|--------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster node daemon v{0: <18} |".format(version)) logger.out("| Parallel Virtual Cluster node daemon v{0: <22} |".format(version))
logger.out("| Debug: {0: <49} |".format(str(config["debug"]))) logger.out("| Debug: {0: <53} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <50} |".format(config["node_fqdn"])) logger.out("| FQDN: {0: <54} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <50} |".format(config["node_hostname"])) logger.out("| Host: {0: <54} |".format(config["node_hostname"]))
logger.out("| ID: {0: <52} |".format(config["node_id"])) logger.out("| ID: {0: <56} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <41} |".format(config["ipmi_hostname"])) logger.out("| IPMI hostname: {0: <45} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |") logger.out("| Machine details: |")
logger.out("| CPUs: {0: <48} |".format(config["static_data"][0])) logger.out("| CPUs: {0: <52} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <48} |".format(config["static_data"][3])) logger.out("| Arch: {0: <52} |".format(config["static_data"][3]))
logger.out("| OS: {0: <50} |".format(config["static_data"][2])) logger.out("| OS: {0: <54} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <46} |".format(config["static_data"][1])) logger.out("| Kernel: {0: <50} |".format(config["static_data"][1]))
logger.out("|----------------------------------------------------------|") logger.out("|--------------------------------------------------------------|")
logger.out("") logger.out("")
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s") logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s")
@ -206,7 +200,7 @@ def entrypoint():
# Define a cleanup function # Define a cleanup function
def cleanup(failure=False): def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance nonlocal logger, zkhandler, keepalive_timer, d_domain
logger.out("Terminating pvcnoded and cleaning up", state="s") logger.out("Terminating pvcnoded and cleaning up", state="s")
@ -255,13 +249,6 @@ def entrypoint():
except Exception: except Exception:
pass pass
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Set stop state in Zookeeper # Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")]) zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
@ -399,8 +386,8 @@ def entrypoint():
node_list = new_node_list node_list = new_node_list
logger.out( logger.out(
f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}', f'{logger.fmt_cyan}Node list:{logger.fmt_end} {" ".join(node_list)}',
state="i", state="s",
) )
# Update node objects lists # Update node objects lists
@ -567,8 +554,8 @@ def entrypoint():
# Update the new list # Update the new list
network_list = new_network_list network_list = new_network_list
logger.out( logger.out(
f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}', f'{logger.fmt_cyan}Network list:{logger.fmt_end} {" ".join(network_list)}',
state="i", state="s",
) )
# Update node objects list # Update node objects list
@ -898,8 +885,8 @@ def entrypoint():
sriov_vf_list = sorted(new_sriov_vf_list) sriov_vf_list = sorted(new_sriov_vf_list)
logger.out( logger.out(
f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}', f'{logger.fmt_cyan}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
state="i", state="s",
) )
if config["enable_hypervisor"]: if config["enable_hypervisor"]:
@ -925,8 +912,8 @@ def entrypoint():
# Update the new list # Update the new list
domain_list = new_domain_list domain_list = new_domain_list
logger.out( logger.out(
f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}', f'{logger.fmt_cyan}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
state="i", state="s",
) )
# Update node objects' list # Update node objects' list
@ -952,8 +939,8 @@ def entrypoint():
# Update the new list # Update the new list
osd_list = new_osd_list osd_list = new_osd_list
logger.out( logger.out(
f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}', f'{logger.fmt_cyan}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
state="i", state="s",
) )
# Pool objects # Pool objects
@ -977,8 +964,8 @@ def entrypoint():
# Update the new list # Update the new list
pool_list = new_pool_list pool_list = new_pool_list
logger.out( logger.out(
f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}', f'{logger.fmt_cyan}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
state="i", state="s",
) )
# Volume objects (in each pool) # Volume objects (in each pool)
@ -1009,15 +996,10 @@ def entrypoint():
# Update the new list # Update the new list
volume_list[pool] = new_volume_list volume_list[pool] = new_volume_list
logger.out( logger.out(
f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}', f'{logger.fmt_cyan}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
state="i", state="s",
) )
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread # Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer( keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node logger, config, zkhandler, this_node

View File

@ -70,12 +70,12 @@ def get_client_id():
def connect_zookeeper(): def connect_zookeeper():
# We expect the environ to contain the config file # We expect the environ to contain the config file
try: try:
pvcnoded_config_file = os.environ["PVCD_CONFIG_FILE"] pvc_config_file = os.environ["PVC_CONFIG_FILE"]
except Exception: except Exception:
# Default place # Default place
pvcnoded_config_file = "/etc/pvc/pvcnoded.yaml" pvc_config_file = "/etc/pvc/pvc.conf"
with open(pvcnoded_config_file, "r") as cfgfile: with open(pvc_config_file, "r") as cfgfile:
try: try:
o_config = yaml.load(cfgfile, yaml.SafeLoader) o_config = yaml.load(cfgfile, yaml.SafeLoader)
except Exception as e: except Exception as e:
@ -87,7 +87,7 @@ def connect_zookeeper():
try: try:
zk_conn = kazoo.client.KazooClient( zk_conn = kazoo.client.KazooClient(
hosts=o_config["pvc"]["cluster"]["coordinators"] hosts=o_config["cluster"]["coordinator_nodes"]
) )
zk_conn.start() zk_conn.start()
except Exception as e: except Exception as e:

View File

@ -131,11 +131,11 @@ class MetadataAPIInstance(object):
# Helper functions # Helper functions
def open_database(self): def open_database(self):
conn = psycopg2.connect( conn = psycopg2.connect(
host=self.config["metadata_postgresql_host"], host=self.config["api_postgresql_host"],
port=self.config["metadata_postgresql_port"], port=self.config["api_postgresql_port"],
dbname=self.config["metadata_postgresql_dbname"], dbname=self.config["api_postgresql_dbname"],
user=self.config["metadata_postgresql_user"], user=self.config["api_postgresql_user"],
password=self.config["metadata_postgresql_password"], password=self.config["api_postgresql_password"],
) )
cur = conn.cursor(cursor_factory=RealDictCursor) cur = conn.cursor(cursor_factory=RealDictCursor)
return conn, cur return conn, cur

View File

@ -743,10 +743,27 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
) )
# Recreate the environment we need for dnsmasq # Recreate the environment we need for dnsmasq
pvcnoded_config_file = os.environ["PVCD_CONFIG_FILE"] pvc_config_file = None
try:
_config_file = "/etc/pvc/pvcnoded.yaml"
if not os.path.exists(_config_file):
raise
pvc_config_file = _config_file
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
pvc_config_file = _config_file
except Exception:
pass
if pvc_config_file is None:
raise Exception
dhcp_environment = { dhcp_environment = {
"DNSMASQ_BRIDGE_INTERFACE": self.bridge_nic, "DNSMASQ_BRIDGE_INTERFACE": self.bridge_nic,
"PVCD_CONFIG_FILE": pvcnoded_config_file, "PVC_CONFIG_FILE": pvc_config_file,
} }
# Define the dnsmasq config fragments # Define the dnsmasq config fragments

View File

@ -700,6 +700,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
active_coordinator_state = this_node.coordinator_state active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now() runtime_start = datetime.now()
logger.out(
"Starting node keepalive run",
state="t",
)
# Set the migration selector in Zookeeper for clients to read # Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]: if config["enable_hypervisor"]:

View File

@ -70,14 +70,16 @@ def start_ceph_mgr(logger, config):
def start_keydb(logger, config): def start_keydb(logger, config):
if config["enable_api"] and config["daemon_mode"] == "coordinator": if (config["enable_api"] or config["enable_worker"]) and config[
"daemon_mode"
] == "coordinator":
logger.out("Starting KeyDB daemon", state="i") logger.out("Starting KeyDB daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess? # TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start keydb-server.service") common.run_os_command("systemctl start keydb-server.service")
def start_api_worker(logger, config): def start_worker(logger, config):
if config["enable_api"]: if config["enable_worker"]:
logger.out("Starting Celery Worker daemon", state="i") logger.out("Starting Celery Worker daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess? # TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start pvcworkerd.service") common.run_os_command("systemctl start pvcworkerd.service")
@ -91,7 +93,7 @@ def start_system_services(logger, config):
start_ceph_mon(logger, config) start_ceph_mon(logger, config)
start_ceph_mgr(logger, config) start_ceph_mgr(logger, config)
start_keydb(logger, config) start_keydb(logger, config)
start_api_worker(logger, config) start_worker(logger, config)
logger.out("Waiting 10 seconds for daemons to start", state="s") logger.out("Waiting 10 seconds for daemons to start", state="s")
sleep(10) sleep(10)

View File

@ -123,7 +123,7 @@ def setup_node(logger, config, zkhandler):
), ),
( (
("node.data.pvc_version", config["node_hostname"]), ("node.data.pvc_version", config["node_hostname"]),
config["pvcnoded_version"], config["daemon_version"],
), ),
( (
("node.ipmi.hostname", config["node_hostname"]), ("node.ipmi.hostname", config["node_hostname"]),
@ -159,7 +159,7 @@ def setup_node(logger, config, zkhandler):
), ),
( (
("node.data.pvc_version", config["node_hostname"]), ("node.data.pvc_version", config["node_hostname"]),
config["pvcnoded_version"], config["daemon_version"],
), ),
( (
("node.ipmi.hostname", config["node_hostname"]), ("node.ipmi.hostname", config["node_hostname"]),

452
pvc.sample.conf Normal file
View File

@ -0,0 +1,452 @@
---
# PVC system configuration - example file
#
# This configuration file defines the details of a PVC cluster.
# It is used by several daemons on the system, including pvcnoded, pvcapid, pvcworkerd, and pvchealthd.
#
# This file will normally be written by the PVC Ansible framework; this example is provided for reference
# Paths configuration
path:
# Plugin directory
plugin_directory: "/usr/share/pvc/plugins"
# Dynamic directory
dynamic_directory: "/run/pvc"
# System log directory
system_log_directory: "/var/log/pvc"
# VM Console log directory (set by Libvirt)
console_log_directory: "/var/log/libvirt"
# Ceph configuration directory (set by Ceph/Ansible)
ceph_directory: "/etc/ceph"
# Subsystem configuration
# Changing these values can be used to turn on or off various parts of PVC
# Normally, all should be enabled ("yes") except in very custom clusters
subsystem:
# Enable or disable hypervisor functionality
enable_hypervisor: yes
# Enable or disable virtual networking and routing functionality
enable_networking: yes
# Enable or disable Ceph storage management functionality
enable_storage: yes
# Enable or disable the worker client
enable_worker: yes
# Enable or disable the API client, if installed, when node is Primary
enable_api: yes
# Cluster configuration
cluster:
# The name of the cluster
name: pvc1
# The full list of nodes in this cluster
all_nodes:
- pvchv1
- pvchv2
- pvchv3
# The list of coorrdinator nodes in this cluster (subset of nodes)
coordinator_nodes:
- pvchv1
- pvchv2
- pvchv3
# Hardcoded networks (upstream/cluster/storage)
networks:
# Upstream network, used for inbound and outbound connectivity, API management, etc.
upstream:
# Domain name
domain: "mydomain.net"
# Device
device: ens4
# MTU
mtu: 1500
# IPv4 configuration
ipv4:
# CIDR netmask
netmask: 24
# Network address
network_address: 10.0.0.0
# Floating address
floating_address: 10.0.0.250
# Upstream/default gateway address
gateway_address: 10.0.0.254
# Node IP selection mechanism (either "by-id", or a static IP, no netmask, in the above network)
node_ip_selection: by-id
# Cluster network, used for inter-node communication (VM- and Network-layer), unrouted
cluster:
# Domain name
domain: "pvc.local"
# Device
device: ens4
# MTU
mtu: 1500
# IPv4 configuration
ipv4:
# CIDR netmask
netmask: 24
# Network address
network_address: 10.0.1.0
# Floating address
floating_address: 10.0.1.250
# Node IP selection mechanism (either "by-id", or a static IP, no netmask, in the above network)
node_ip_selection: by-id
# Storage network, used for inter-node communication (Storage-layer), unrouted
storage:
# Domain name
domain: "storage.local"
# Device
device: ens4
# MTU
mtu: 1500
# IPv4 configuration
ipv4:
# CIDR netmask
netmask: 24
# Network address
network_address: 10.0.2.0
# Floating address
floating_address: 10.0.2.250
# Node IP selection mechanism (either "by-id", or a static IP, no netmask, in the above network)
node_ip_selection: by-id
# Database configuration
database:
# Zookeeper client configuration
zookeeper:
# Port number
port: 2181
# KeyDB/Redis client configuration
keydb:
# Port number
port: 6379
# Hostname; use `cluster` network floating IP address
hostname: 10.0.1.250
# Path, usually "/0"
path: "/0"
# PostgreSQL client configuration
postgres:
# Port number
port: 5432
# Hostname; use `cluster` network floating IP address
hostname: 10.0.1.250
# Credentials
credentials:
# API database
api:
# Database name
database: pvcapi
# Username
username: pvcapi
# Password
password: pvcapiPassw0rd
# DNS database
dns:
# Database name
database: pvcdns
# Username
username: pvcdns
# Password
password: pvcdnsPassw0rd
# Timer information
timer:
# VM shutdown timeout (seconds)
vm_shutdown_timeout: 180
# Node keepalive interval (seconds)
keepalive_interval: 5
# Monitoring interval (seconds)
monitoring_interval: 60
# Fencing configuration
fencing:
# Disable fencing or not on IPMI failure at startup
# Setting this value to "no" will allow fencing to be enabled even if does not respond during node daemon
# startup. This will allow future fencing to be attempted if it later recovers.
disable_on_ipmi_failure: no
# Fencing intervals
intervals:
# Fence intervals (number of keepalives)
fence_intervals: 6
# Suicide intervals (number of keepalives; 0 to disable)
suicide_intervals: 0
# Fencing actions
actions:
# Successful fence action ("migrate" or "none")
successful_fence: migrate
# Failed fence action ("migrate" or "none")
failed_fence: none
# IPMI details
ipmi:
# Hostname format; use a "{node_id}" entry for a template, or a literal hostname
hostname: "pvchv{node_id}-lom.mydomain.tld"
# IPMI username
username: admin
# IPMI password
password: S3cur3IPMIP4ssw0rd
# VM migration configuration
migration:
# Target selection default value (mem, memprov, load, vcpus, vms)
target_selector: mem
# Logging configuration
logging:
# Enable or disable debug logging (all services)
debug_logging: yes
# Enable or disable file logging
file_logging: no
# Enable or disable stdout logging (to journald)
stdout_logging: yes
# Enable or disable Zookeeper logging (for "pvc node log" functionality)
zookeeper_logging: yes
# Enable or disable ANSI colour sequences in logs
log_colours: yes
# Enable or disable dates in logs
log_dates: yes
# Enale or disable keepalive event logging
log_keepalives: yes
# Enable or disable cluster detail logging during keepalive events
log_cluster_details: yes
# Enable or disable monitoring detail logging during healthcheck events
log_monitoring_details: yes
# Number of VM console log lines to store in Zookeeper (per VM)
console_log_lines: 1000
# Number of node log lines to store in Zookeeper (per node)
node_log_lines: 2000
# Guest networking configuration
guest_networking:
# Bridge device for "bridged"-type networks
bridge_device: ens4
# Bridge device MTU
bridge_mtu: 1500
# Enable or disable SR-IOV functionality
sriov_enable: no
# SR-IOV configuration (list of PFs)
sriov_device:
# SR-IOV device; if this device isn't found, it is ignored on a given node
- device: ens1f1
# SR-IOV device MTU
mtu: 9000
# Number of VFs on this device
vfcount: 4
# Ceph configuration
ceph:
# Main config file name
ceph_config_file: "ceph.conf"
# Admin keyring file name
ceph_keyring_file: "ceph.client.admin.keyring"
# Monitor port, usually 6789
monitor_port: 6789
# Monitor host(s), enable only you want to use hosts other than the coordinators
#monitor_hosts:
# - pvchv1
# - pvchv2
# - pvchv3
# Storage secret UUID, generated during Ansible cluster bootstrap
secret_uuid: ""
# API configuration
api:
# API listening configuration
listen:
# Listen address, usually upstream floating IP
address: 10.0.0.250
# Listen port, usually 7370
port: 7370
# Authentication configuration
authentication:
# Enable or disable authentication
enabled: yes
# Secret key for API cookies (long and secure password or UUID)
secret_key: "1234567890abcdefghijklmnopqrstuvwxyz"
# Authentication source (token, others in future)
source: token
# Token configuration
token:
# A friendly description
- description: "testing"
# The token (long and secure password or UUID)
token: "1234567890abcdefghijklmnopqrstuvwxyz"
# SSL configuration
ssl:
# Enable or disable SSL operation
enabled: no
# Certificate file path
certificate: ""
# Private key file path
private_key: ""
# Automatic backups
autobackup:
# Backup root path on the node, used as the remote mountpoint
# Must be an absolute path beginning with '/'
# If remote_mount is enabled, the remote mount will be mounted on this directory
# If remote_mount is enabled, it is recommended to use a path under `/tmp` for this
# If remote_mount is disabled, a real filesystem must be mounted here (PVC system volumes are small!)
backup_root_path: "/tmp/backups"
# Suffix to the backup root path, used to allow multiple PVC systems to write to a single root path
# Must begin with '/'; leave empty to use the backup root path directly
# Note that most remote mount options can fake this if needed, but provided to ensure local compatability
backup_root_suffix: "/mycluster"
# VM tag(s) to back up
# Only VMs with at least one of the given tag(s) will be backed up; all others will be skipped
backup_tags:
- "backup"
- "mytag"
# Backup schedule: when and what format to take backups
backup_schedule:
full_interval: 7 # Number of total backups between full backups; others are incremental
# > If this number is 1, every backup will be a full backup and no incremental
# backups will be taken
# > If this number is 2, every second backup will be a full backup, etc.
full_retention: 2 # Keep this many full backups; the oldest will be deleted when a new one is
# taken, along with all child incremental backups of that backup
# > Should usually be at least 2 when using incrementals (full_interval > 1) to
# avoid there being too few backups after cleanup from a new full backup
# Automatic mount settings
# These settings permit running an arbitrary set of commands, ideally a "mount" command or similar, to
# ensure that a remote filesystem is mounted on the backup root path
# While the examples here show absolute paths, that is not required; they will run with the $PATH of the
# executing environment (either the "pvc" command on a CLI or a cron/systemd timer)
# A "{backup_root_path}" f-string/str.format type variable MAY be present in any cmds string to represent
# the above configured root backup path, which is interpolated at runtime
# If multiple commands are given, they will be executed in the order given; if no commands are given,
# nothing is executed, but the keys MUST be present
auto_mount:
enabled: no # Enable automatic mount/unmount support
# These commands are executed at the start of the backup run and should mount a filesystem
mount_cmds:
# This example shows an NFS mount leveraging the backup_root_path variable
- "/usr/sbin/mount.nfs -o nfsvers=3 10.0.0.10:/backups {backup_root_path}"
# These commands are executed at the end of the backup run and should unmount a filesystem
unmount_cmds:
# This example shows a generic umount leveraging the backup_root_path variable
- "/usr/bin/umount {backup_root_path}"
# VIM modeline, requires "set modeline" in your VIMRC
# vim: expandtab shiftwidth=2 tabstop=2 filetype=yaml

1
worker-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

24
worker-daemon/pvcworkerd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvcworkerd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcworkerd.Daemon # noqa: F401
pvcworkerd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster worker daemon unit file
[Unit]
Description = Parallel Virtual Cluster worker daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvcworkerd.sh
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
# app arguments work in a non-backwards-compatible way with Celery 5. # app arguments work in a non-backwards-compatible way with Celery 5.
case "$( cat /etc/debian_version )" in case "$( cat /etc/debian_version )" in
10.*) 10.*)
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" CELERY_ARGS="worker --app pvcworkerd.Daemon.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;; ;;
*) *)
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO" CELERY_ARGS="--app pvcworkerd.Daemon.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;; ;;
esac esac

View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
# Daemon.py - PVC Node Worker daemon
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from celery import Celery
import daemon_lib.config as cfg
from daemon_lib.zkhandler import ZKConnection
from daemon_lib.vm import (
vm_worker_flush_locks,
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from daemon_lib.benchmark import (
worker_run_benchmark,
)
from daemon_lib.vmbuilder import (
worker_create_vm,
)
version = "0.9.82"
config = cfg.get_configuration()
config["daemon_name"] = "pvcworkerd"
config["daemon_version"] = version
celery_task_uri = "redis://{}:{}{}".format(
config["keydb_host"], config["keydb_port"], config["keydb_path"]
)
celery = Celery(
"pvcworkerd",
broker=celery_task_uri,
result_backend=celery_task_uri,
result_extended=True,
)
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return worker_create_vm(
self,
config,
vm_name,
profile_name,
define_vm=define_vm,
start_vm=start_vm,
script_run_args=script_run_args,
)
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def storage_benchmark(self, pool=None, run_on="primary"):
@ZKConnection(config)
def run_storage_benchmark(zkhandler, self, pool):
return worker_run_benchmark(zkhandler, self, config, pool)
return run_storage_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
return run_vm_flush_locks(self, domain, force_unlock=force_unlock)
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
return run_vm_device_attach(self, domain, xml)
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
def entrypoint():
pass

View File