Compare commits

...

51 Commits

Author SHA1 Message Date
7e6d922877 Improve fault detail handling further
Since we already had a "details" field, simply move where it gets added
to the message later, in generate_fault, after the main message value
was used to generate the ID.
2023-12-09 16:13:36 -05:00
4ca2381077 Rework metrics output and add combined endpoint 2023-12-09 15:47:40 -05:00
4003204f14 Remove bracketed text from fault_str
This ensures that certain faults e.g. Ceph status faults, will be
combined despite the added text in brackets, while still keeping them
mostly separate.

Also ensure the health text is updated each time to assist with this, as
this health text may now change independent of the fault ID.
2023-12-09 15:34:18 -05:00
a70c1d63b0 Separate state totals from states, separate states 2023-12-09 13:59:17 -05:00
2bea78d25e Make all remaining limits optional 2023-12-09 13:43:58 -05:00
fd717b702d Use external list of fault states 2023-12-09 12:51:41 -05:00
132cde5591 Add totals and nice-format states
Avoids tons of annoying rewriting in the UI later.
2023-12-09 12:50:19 -05:00
ba565ead4c Report all state combinations in Prom metrics
Ensures that every state combination is always shown to metrics, even if
it contains 0 entries.
2023-12-09 12:40:37 -05:00
317ca4b98c Move defined state combinations into common 2023-12-09 12:36:32 -05:00
2b8abea8df Remove debug printing 2023-12-09 12:22:36 -05:00
9b3c9f1be5 Add Ceph metrics proxy and health fault counts 2023-12-09 12:22:36 -05:00
7373bfed3f Add Prometheus metric exporter
Adds a "fake" Prometheus metrics endpoint which returns cluster status
information in Prometheus format.
2023-12-09 12:22:36 -05:00
d0e7c19602 Add prometheus client dependencies 2023-12-09 12:22:36 -05:00
f01c12c86b Import from pvcworkerd not pvcapid 2023-12-09 12:22:19 -05:00
0bda095571 Move libvirt_schema and fix other imports 2023-12-09 12:20:29 -05:00
7976e1d2d0 Correct import location in scripts 2023-12-09 12:18:33 -05:00
813aef1463 Fix incorrect UUID key name 2023-12-09 12:14:57 -05:00
5a7ea25266 Fix incorrect database name entries 2023-12-09 12:12:00 -05:00
82a7fd3c80 Add more debugging info to psql 2023-12-07 21:36:05 -05:00
ddd9d9ee07 Adjust psql check to avoid weird failures 2023-12-07 15:07:59 -05:00
9e2e749c55 Combine pvchealthd output into single log message 2023-12-07 14:00:43 -05:00
157b8c20bf Add Patroni output to debug logs 2023-12-07 14:00:35 -05:00
bf158dc2d9 Shorten debug output 2023-12-07 13:31:20 -05:00
1b84553405 Use passed coordinator state 2023-12-07 11:19:26 -05:00
60dac143f2 Use simpler health calculation 2023-12-07 11:17:31 -05:00
a13273335d Add colon to result text 2023-12-07 11:15:42 -05:00
e7f21b7058 Enhance and fix bugs in psql plugin
1. Check Patronictl statuses
2. Don't error during node primary transitions
2023-12-07 11:14:16 -05:00
9dbadfdd6e Move back to per-plugin fault reporting 2023-12-07 11:13:56 -05:00
61b39d0739 Fix incorrect cluster health calculation 2023-12-07 11:13:36 -05:00
4bf80a5913 Fix missing datetime shrink 2023-12-06 17:15:36 -05:00
6c0dfe16cf Improve word splitting for fault messages
This ensures that fault messages are split on word boundaries and that
the column length is equal to the longest of these if applicable.
2023-12-06 17:10:19 -05:00
3fde494fc5 Add status back to short fault list 2023-12-06 16:53:23 -05:00
0945b3faf3 Use same fault formatting for short and long 2023-12-06 16:19:44 -05:00
1416f9edc0 Remove bad sort values 2023-12-06 14:38:29 -05:00
5691f75ac9 Fix bad import 2023-12-06 14:28:32 -05:00
e0bf7f7d1a Fix bad ID values in acknowledge 2023-12-06 14:18:31 -05:00
0c34c88a1f Fix bad dict key name 2023-12-06 14:16:19 -05:00
20acf3295f Add mass ack/delete of faults 2023-12-06 13:59:39 -05:00
4a02c2c8e3 Add additional faults 2023-12-06 13:27:39 -05:00
6fc5c927a1 Properly sort status faults 2023-12-06 13:27:18 -05:00
d1e34e7333 Store fault times only to the second
Any more precision is unnecessary and saves 6 chars when displaying
these times elsewhere.
2023-12-06 13:20:18 -05:00
79eb54d5da Move fault generation to common library 2023-12-06 13:17:10 -05:00
536fb2080f Fix get_terminal_size over SSH 2023-12-06 13:11:28 -05:00
2267a9c85d Improve output formatting for simplicity 2023-12-05 10:37:35 -05:00
067e73337f Shorten health IDs to 8 characters 2023-12-04 15:48:27 -05:00
672e58133f Implement interfaces to faults 2023-12-04 01:37:54 -05:00
b59f743690 Improve logging and handling of fault entries 2023-12-01 17:38:28 -05:00
4c3f235e05 Avoid running fault updates in maintenance mode
When the cluster is in maintenance mode, all faults should be ignored.
2023-12-01 17:38:28 -05:00
3dc48c1783 Lower default monitoring interval to 15s
Faults are also reported on the monitoring interval, so 60s seems like
too long. Lower this to 15 seconds by default instead.
2023-12-01 17:38:28 -05:00
9c2b1b29ee Add node health to fault states
Adjusts ordering and ensures that node health states are included in
faults if they are less than 50%.

Also adjusts fault ID generation and runs fault checks only coordinator
nodes to avoid too many runs.
2023-12-01 17:38:28 -05:00
8594eb697f Add initial fault generation in pvchealthd
References: #164
2023-12-01 17:38:27 -05:00
26 changed files with 1931 additions and 241 deletions

View File

@ -147,7 +147,7 @@
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
from daemon_lib.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -174,7 +174,7 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
import pvcapid.libvirt_schema as libvirt_schema
import daemon_lib.libvirt_schema as libvirt_schema
import datetime
import random

View File

@ -148,7 +148,7 @@
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
from daemon_lib.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -177,7 +177,7 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
import pvcapid.libvirt_schema as libvirt_schema
import daemon_lib.libvirt_schema as libvirt_schema
import datetime
import random
@ -289,8 +289,8 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph
import os
@ -383,8 +383,8 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.ceph as pvc_ceph
for volume in list(reversed(self.vm_data["volumes"])):

View File

@ -147,7 +147,7 @@
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
from daemon_lib.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -186,7 +186,7 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
import pvcapid.libvirt_schema as libvirt_schema
import daemon_lib.libvirt_schema as libvirt_schema
import datetime
import random
@ -301,16 +301,16 @@ class VMBuilderScript(VMBuilder):
This function should use the various exposed PVC commands as indicated to create
RBD block devices and map them to the host as required.
open_zk is exposed from pvcapid.vmbuilder to provide a context manager for opening
open_zk is exposed from daemon_lib.vmbuilder to provide a context manager for opening
connections to the PVC Zookeeper cluster; ensure you also import (and pass it)
the config object from pvcapid.Daemon as well. This context manager then allows
the config object from pvcworkerd.Daemon as well. This context manager then allows
the use of various common daemon library functions, without going through the API.
"""
# Run any imports first
import os
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph
@ -446,7 +446,7 @@ class VMBuilderScript(VMBuilder):
# Run any imports first
import os
from pvcapid.vmbuilder import chroot
from daemon_lib.vmbuilder import chroot
# The directory we mounted things on earlier during prepare(); this could very well
# be exposed as a module-level variable if you so choose
@ -718,8 +718,8 @@ GRUB_DISABLE_LINUX_UUID=false
# Run any imports first
import os
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph

View File

@ -147,7 +147,7 @@
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
from daemon_lib.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -186,7 +186,7 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
import pvcapid.libvirt_schema as libvirt_schema
import daemon_lib.libvirt_schema as libvirt_schema
import datetime
import random
@ -301,16 +301,16 @@ class VMBuilderScript(VMBuilder):
This function should use the various exposed PVC commands as indicated to create
RBD block devices and map them to the host as required.
open_zk is exposed from pvcapid.vmbuilder to provide a context manager for opening
open_zk is exposed from daemon_lib.vmbuilder to provide a context manager for opening
connections to the PVC Zookeeper cluster; ensure you also import (and pass it)
the config object from pvcapid.Daemon as well. This context manager then allows
the config object from pvcworkerd.Daemon as well. This context manager then allows
the use of various common daemon library functions, without going through the API.
"""
# Run any imports first
import os
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph
@ -446,7 +446,7 @@ class VMBuilderScript(VMBuilder):
# Run any imports first
import os
from pvcapid.vmbuilder import chroot
from daemon_lib.vmbuilder import chroot
import daemon_lib.common as pvc_common
# The directory we mounted things on earlier during prepare(); this could very well
@ -692,8 +692,8 @@ GRUB_SERIAL_COMMAND="serial --speed=115200 --unit=0 --word=8 --parity=no --stop=
# Run any imports first
import os
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph

View File

@ -173,7 +173,7 @@
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
from daemon_lib.vmbuilder import VMBuilder
# Set up some variables for later; if you frequently use these tools, you might benefit from
@ -243,7 +243,7 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
import pvcapid.libvirt_schema as libvirt_schema
import daemon_lib.libvirt_schema as libvirt_schema
import datetime
import random
@ -358,8 +358,8 @@ class VMBuilderScript(VMBuilder):
# Run any imports first; as shown here, you can import anything from the PVC
# namespace, as well as (of course) the main Python namespaces
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.common as pvc_common
import daemon_lib.ceph as pvc_ceph
import json
@ -902,8 +902,8 @@ class VMBuilderScript(VMBuilder):
"""
# Run any imports first
from pvcapid.vmbuilder import open_zk
from pvcapid.Daemon import config
from daemon_lib.vmbuilder import open_zk
from pvcworkerd.Daemon import config
import daemon_lib.ceph as pvc_ceph
# Use this construct for reversing the list, as the normal reverse() messes with the list

View File

@ -622,6 +622,294 @@ class API_Status(Resource):
api.add_resource(API_Status, "/status")
# /metrics
class API_Metrics(Resource):
def get(self):
"""
Return the current PVC cluster status in Prometheus-compatible metrics format and
the Ceph cluster metrics as one document.
Endpoint is unauthenticated to allow metrics exfiltration without having to deal
with the Prometheus compatibility later.
---
tags:
- root
responses:
200:
description: OK
400:
description: Bad request
"""
cluster_output, cluster_retcode = api_helper.cluster_metrics()
ceph_output, ceph_retcode = api_helper.ceph_metrics()
if cluster_retcode != 200 or ceph_retcode != 200:
output = "Error: Failed to obtain data"
retcode = 400
else:
output = cluster_output + ceph_output
retcode = 200
response = flask.make_response(output, retcode)
response.mimetype = "text/plain"
return response
api.add_resource(API_Metrics, "/metrics")
# /metrics/pvc
class API_Metrics_PVC(Resource):
def get(self):
"""
Return the current PVC cluster status in Prometheus-compatible metrics format
Endpoint is unauthenticated to allow metrics exfiltration without having to deal
with the Prometheus compatibility later.
---
tags:
- root
responses:
200:
description: OK
400:
description: Bad request
"""
cluster_output, cluster_retcode = api_helper.cluster_metrics()
if cluster_retcode != 200:
output = "Error: Failed to obtain data"
retcode = 400
else:
output = cluster_output
retcode = 200
response = flask.make_response(output, retcode)
response.mimetype = "text/plain"
return response
api.add_resource(API_Metrics_PVC, "/metrics/pvc")
# /metrics/ceph
class API_Metrics_Ceph(Resource):
def get(self):
"""
Return the current PVC Ceph Prometheus metrics
Proxies a metrics request to the current active MGR, since this is dynamic
and can't be controlled by PVC easily.
---
tags:
- root
responses:
200:
description: OK
400:
description: Bad request
"""
ceph_output, ceph_retcode = api_helper.ceph_metrics()
if ceph_retcode != 200:
output = "Error: Failed to obtain data"
retcode = 400
else:
output = ceph_output
retcode = 200
response = flask.make_response(output, retcode)
response.mimetype = "text/plain"
return response
api.add_resource(API_Metrics_Ceph, "/metrics/ceph")
# /faults
class API_Faults(Resource):
@RequestParser(
[
{
"name": "sort_key",
"choices": (
"first_reported",
"last_reported",
"acknowledged_at",
"status",
"health_delta",
"message",
),
"helptext": "A valid sort key must be specified",
"required": False,
},
]
)
@Authenticator
def get(self, reqargs):
"""
Return a list of cluster faults
---
tags:
- faults
parameters:
- in: query
name: sort_key
type: string
required: false
description: The fault object key to sort results by
enum:
- first_reported
- last_reported
- acknowledged_at
- status
- health_delta
- message
responses:
200:
description: OK
schema:
type: array
items:
type: object
id: fault
properties:
id:
type: string
description: The ID of the fault
example: "10ae144b78b4cc5fdf09e2ebbac51235"
first_reported:
type: date
description: The first time the fault was reported
example: "2023-12-01 16:47:59.849742"
last_reported:
type: date
description: The last time the fault was reported
example: "2023-12-01 17:39:45.188398"
acknowledged_at:
type: date
description: The time the fault was acknowledged, or empty if not acknowledged
example: "2023-12-01 17:50:00.000000"
status:
type: string
description: The current state of the fault, either "new" or "ack" (acknowledged)
example: "new"
health_delta:
type: integer
description: The health delta (amount it reduces cluster health from 100%) of the fault
example: 25
message:
type: string
description: The textual description of the fault
example: "Node hv1 was at 40% (psur@-10%, psql@-50%) <= 50% health"
"""
return api_helper.fault_list(sort_key=reqargs.get("sort_key", "last_reported"))
@Authenticator
def put(self):
"""
Acknowledge all cluster faults
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_acknowledge_all()
@Authenticator
def delete(self):
"""
Delete all cluster faults
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_delete_all()
api.add_resource(API_Faults, "/faults")
# /faults/<fault_id>
class API_Faults_Element(Resource):
@Authenticator
def get(self, fault_id):
"""
Return a single cluster fault
---
tags:
- faults
responses:
200:
description: OK
schema:
type: array
items:
type: object
id: fault
$ref: '#/definitions/fault'
"""
return api_helper.fault_list(limit=fault_id)
@Authenticator
def put(self, fault_id):
"""
Acknowledge a cluster fault
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_acknowledge(fault_id)
@Authenticator
def delete(self, fault_id):
"""
Delete a cluster fault
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_delete(fault_id)
api.add_resource(API_Faults_Element, "/faults/<fault_id>")
# /tasks
class API_Tasks(Resource):
@Authenticator

View File

@ -23,6 +23,8 @@ import flask
import json
import lxml.etree as etree
from re import match
from requests import get
from werkzeug.formparser import parse_form_data
from pvcapid.Daemon import config, strtobool
@ -31,6 +33,7 @@ from daemon_lib.zkhandler import ZKConnection
import daemon_lib.common as pvc_common
import daemon_lib.cluster as pvc_cluster
import daemon_lib.faults as pvc_faults
import daemon_lib.node as pvc_node
import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network
@ -118,6 +121,317 @@ def cluster_maintenance(zkhandler, maint_state="false"):
return retdata, retcode
#
# Metrics functions
#
@pvc_common.Profiler(config)
@ZKConnection(config)
def cluster_metrics(zkhandler):
"""
Format status data from cluster_status into Prometheus-compatible metrics
"""
# Get general cluster information
status_retflag, status_data = pvc_cluster.get_info(zkhandler)
if not status_retflag:
return "Error: Status data threw error", 400
faults_retflag, faults_data = pvc_faults.get_list(zkhandler)
if not faults_retflag:
return "Error: Faults data threw error", 400
node_retflag, node_data = pvc_node.get_list(zkhandler)
if not node_retflag:
return "Error: Node data threw error", 400
vm_retflag, vm_data = pvc_vm.get_list(zkhandler)
if not vm_retflag:
return "Error: VM data threw error", 400
osd_retflag, osd_data = pvc_ceph.get_list_osd(zkhandler)
if not osd_retflag:
return "Error: OSD data threw error", 400
output_lines = list()
output_lines.append("# HELP pvc_info PVC cluster information")
output_lines.append("# TYPE pvc_info gauge")
output_lines.append(
f"pvc_info{{primary_node=\"{status_data['primary_node']}\", version=\"{status_data['pvc_version']}\", upstream_ip=\"{status_data['upstream_ip']}\"}} 1"
)
output_lines.append("# HELP pvc_cluster_maintenance PVC cluster maintenance state")
output_lines.append("# TYPE pvc_cluster_maintenance gauge")
output_lines.append(
f"pvc_cluster_maintenance {1 if bool(strtobool(status_data['maintenance'])) else 0}"
)
output_lines.append("# HELP pvc_cluster_health PVC cluster health status")
output_lines.append("# TYPE pvc_cluster_health gauge")
output_lines.append(f"pvc_cluster_health {status_data['cluster_health']['health']}")
output_lines.append("# HELP pvc_cluster_faults PVC cluster new faults")
output_lines.append("# TYPE pvc_cluster_faults gauge")
fault_map = dict()
for fault_type in pvc_common.fault_state_combinations:
fault_map[fault_type] = 0
for fault in faults_data:
fault_map[fault["status"]] += 1
for fault_type in fault_map:
output_lines.append(
f'pvc_cluster_faults{{status="{fault_type}"}} {fault_map[fault_type]}'
)
# output_lines.append("# HELP pvc_cluster_faults PVC cluster health faults")
# output_lines.append("# TYPE pvc_cluster_faults gauge")
# for fault_msg in status_data["cluster_health"]["messages"]:
# output_lines.append(
# f"pvc_cluster_faults{{id=\"{fault_msg['id']}\", message=\"{fault_msg['text']}\"}} {fault_msg['health_delta']}"
# )
output_lines.append("# HELP pvc_node_health PVC cluster node health status")
output_lines.append("# TYPE pvc_node_health gauge")
for node in status_data["node_health"]:
if isinstance(status_data["node_health"][node]["health"], int):
output_lines.append(
f"pvc_node_health{{node=\"{node}\"}} {status_data['node_health'][node]['health']}"
)
output_lines.append("# HELP pvc_node_daemon_states PVC Node daemon state counts")
output_lines.append("# TYPE pvc_node_daemon_states gauge")
node_daemon_state_map = dict()
for state in set([s.split(",")[0] for s in pvc_common.node_state_combinations]):
node_daemon_state_map[state] = 0
for node in node_data:
node_daemon_state_map[node["daemon_state"]] += 1
for state in node_daemon_state_map:
output_lines.append(
f'pvc_node_daemon_states{{state="{state}"}} {node_daemon_state_map[state]}'
)
output_lines.append("# HELP pvc_node_domain_states PVC Node domain state counts")
output_lines.append("# TYPE pvc_node_domain_states gauge")
node_domain_state_map = dict()
for state in set([s.split(",")[1] for s in pvc_common.node_state_combinations]):
node_domain_state_map[state] = 0
for node in node_data:
node_domain_state_map[node["domain_state"]] += 1
for state in node_domain_state_map:
output_lines.append(
f'pvc_node_domain_states{{state="{state}"}} {node_domain_state_map[state]}'
)
output_lines.append("# HELP pvc_vm_states PVC VM state counts")
output_lines.append("# TYPE pvc_vm_states gauge")
vm_state_map = dict()
for state in set(pvc_common.vm_state_combinations):
vm_state_map[state] = 0
for vm in vm_data:
vm_state_map[vm["state"]] += 1
for state in vm_state_map:
output_lines.append(f'pvc_vm_states{{state="{state}"}} {vm_state_map[state]}')
output_lines.append("# HELP pvc_osd_up_states PVC OSD up state counts")
output_lines.append("# TYPE pvc_osd_up_states gauge")
osd_up_state_map = dict()
for state in set([s.split(",")[0] for s in pvc_common.ceph_osd_state_combinations]):
osd_up_state_map[state] = 0
for osd in osd_data:
if osd["stats"]["up"] > 0:
osd_up_state_map["up"] += 1
else:
osd_up_state_map["down"] += 1
for state in osd_up_state_map:
output_lines.append(
f'pvc_osd_up_states{{state="{state}"}} {osd_up_state_map[state]}'
)
output_lines.append("# HELP pvc_osd_in_states PVC OSD in state counts")
output_lines.append("# TYPE pvc_osd_in_states gauge")
osd_in_state_map = dict()
for state in set([s.split(",")[1] for s in pvc_common.ceph_osd_state_combinations]):
osd_in_state_map[state] = 0
for osd in osd_data:
if osd["stats"]["in"] > 0:
osd_in_state_map["in"] += 1
else:
osd_in_state_map["out"] += 1
for state in osd_in_state_map:
output_lines.append(
f'pvc_osd_in_states{{state="{state}"}} {osd_in_state_map[state]}'
)
output_lines.append("# HELP pvc_nodes PVC Node count")
output_lines.append("# TYPE pvc_nodes gauge")
output_lines.append(f"pvc_nodes {status_data['nodes']['total']}")
output_lines.append("# HELP pvc_vms PVC VM count")
output_lines.append("# TYPE pvc_vms gauge")
output_lines.append(f"pvc_vms {status_data['vms']['total']}")
output_lines.append("# HELP pvc_osds PVC OSD count")
output_lines.append("# TYPE pvc_osds gauge")
output_lines.append(f"pvc_osds {status_data['osds']['total']}")
output_lines.append("# HELP pvc_networks PVC Network count")
output_lines.append("# TYPE pvc_networks gauge")
output_lines.append(f"pvc_networks {status_data['networks']}")
output_lines.append("# HELP pvc_pools PVC Storage Pool count")
output_lines.append("# TYPE pvc_pools gauge")
output_lines.append(f"pvc_pools {status_data['pools']}")
output_lines.append("# HELP pvc_volumes PVC Storage Volume count")
output_lines.append("# TYPE pvc_volumes gauge")
output_lines.append(f"pvc_volumes {status_data['volumes']}")
output_lines.append("# HELP pvc_snapshots PVC Storage Snapshot count")
output_lines.append("# TYPE pvc_snapshots gauge")
output_lines.append(f"pvc_snapshots {status_data['snapshots']}")
return "\n".join(output_lines) + "\n", 200
@pvc_common.Profiler(config)
@ZKConnection(config)
def ceph_metrics(zkhandler):
"""
Obtain current Ceph Prometheus metrics from the active MGR
"""
# We have to parse out the *name* of the currently active MGR
# While the JSON version of the "ceph status" output provides a
# URL, this URL is in the backend (i.e. storage) network, which
# the API might not have access to. This way, we can connect to
# the node name which can be handled however.
retcode, retdata = pvc_ceph.get_status(zkhandler)
if not retcode:
ceph_mgr_node = None
else:
ceph_data = retdata["ceph_data"]
try:
ceph_mgr_line = [
n for n in ceph_data.split("\n") if match(r"^mgr:", n.strip())
][0]
ceph_mgr_node = ceph_mgr_line.split()[1].split("(")[0]
except Exception:
ceph_mgr_node = None
if ceph_mgr_node is not None:
# Get the data from the endpoint
# We use the default port of 9283
ceph_prometheus_uri = f"http://{ceph_mgr_node}:9283/metrics"
response = get(ceph_prometheus_uri)
if response.status_code == 200:
output = response.text
status_code = 200
else:
output = (
f"Error: Failed to obtain metric data from {ceph_mgr_node} MGR daemon\n"
)
status_code = 400
else:
output = "Error: Failed to find an active MGR node\n"
status_code = 400
return output, status_code
#
# Fault functions
#
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_list(zkhandler, limit=None, sort_key="last_reported"):
"""
Return a list of all faults sorted by SORT_KEY.
"""
retflag, retdata = pvc_faults.get_list(zkhandler, limit=limit, sort_key=sort_key)
if retflag:
retcode = 200
elif retflag and limit is not None and len(retdata) < 1:
retcode = 404
retdata = {"message": f"No fault with ID {limit} found"}
else:
retcode = 400
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_acknowledge(zkhandler, fault_id):
"""
Acknowledge a fault of FAULT_ID.
"""
retflag, retdata = pvc_faults.acknowledge(zkhandler, fault_id=fault_id)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_acknowledge_all(zkhandler):
"""
Acknowledge all faults.
"""
retflag, retdata = pvc_faults.acknowledge(zkhandler)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_delete(zkhandler, fault_id):
"""
Delete a fault of FAULT_ID.
"""
retflag, retdata = pvc_faults.delete(zkhandler, fault_id=fault_id)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_delete_all(zkhandler):
"""
Delete all faults.
"""
retflag, retdata = pvc_faults.delete(zkhandler)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
#
# Node functions
#

View File

@ -50,7 +50,7 @@ def open_database(config):
conn = psycopg2.connect(
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_name"],
dbname=config["api_postgresql_dbname"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)

View File

@ -37,6 +37,7 @@ from pvc.cli.parsers import *
from pvc.cli.formatters import *
import pvc.lib.cluster
import pvc.lib.faults
import pvc.lib.node
import pvc.lib.vm
import pvc.lib.network
@ -347,40 +348,6 @@ def cli_cluster():
pass
###############################################################################
# > pvc cluster status
###############################################################################
@click.command(
name="status",
short_help="Show cluster status.",
)
@connection_req
@format_opt(
{
"pretty": cli_cluster_status_format_pretty,
"short": cli_cluster_status_format_short,
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
}
)
def cli_cluster_status(
format_function,
):
"""
Show information and health about a PVC cluster.
\b
Format options:
"pretty": Output all details in a nice colourful format.
"short" Output only details about cluster health in a nice colourful format.
"json": Output in unformatted JSON.
"json-pretty": Output in formatted JSON.
"""
retcode, retdata = pvc.lib.cluster.get_info(CLI_CONFIG)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc cluster init
###############################################################################
@ -485,6 +452,157 @@ def cli_cluster_restore(
"""
###############################################################################
# > pvc cluster status
###############################################################################
@click.command(
name="status",
short_help="Show cluster status.",
)
@connection_req
@format_opt(
{
"pretty": cli_cluster_status_format_pretty,
"short": cli_cluster_status_format_short,
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
}
)
def cli_cluster_status(
format_function,
):
"""
Show information and health about a PVC cluster.
\b
Format options:
"pretty": Output all details in a nice colourful format.
"short" Output only details about cluster health in a nice colourful format.
"json": Output in unformatted JSON.
"json-pretty": Output in formatted JSON.
"""
retcode, retdata = pvc.lib.cluster.get_info(CLI_CONFIG)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc cluster fault
###############################################################################
@click.group(
name="fault",
short_help="Manage PVC cluster faults.",
context_settings=CONTEXT_SETTINGS,
)
def cli_cluster_fault():
"""
Manage faults in the PVC cluster.
"""
pass
###############################################################################
# > pvc cluster fault list
###############################################################################
@click.command(
name="list",
short_help="List all cluster faults.",
)
@click.argument("limit", default=None, required=False)
@format_opt(
{
"short": cli_cluster_fault_list_format_short,
"long": cli_cluster_fault_list_format_long,
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
},
default_format="short",
)
@connection_req
def cli_cluster_fault_list(limit, format_function):
"""
List all faults in the PVC cluster, optionally limited to fault ID LIMIT.
"""
retcode, retdata = pvc.lib.faults.get_list(
CLI_CONFIG,
limit=limit,
)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc cluster fault ack
###############################################################################
@click.command(
name="ack",
short_help="Acknowledge a cluster fault.",
)
@click.argument("fault_id")
@connection_req
def cli_cluster_fault_acknowledge(fault_id):
"""
Acknowledge the cluster fault FAULT_ID.
"""
retcode, retdata = pvc.lib.faults.acknowledge(CLI_CONFIG, fault_id)
finish(retcode, retdata)
###############################################################################
# > pvc cluster fault ack-all
###############################################################################
@click.command(
name="ack-all",
short_help="Acknowledge all cluster faults.",
)
@confirm_opt("Acknowledge all current cluster faults")
@connection_req
def cli_cluster_fault_acknowledge_all():
"""
Acknowledge all cluster faults.
"""
retcode, retdata = pvc.lib.faults.acknowledge_all(CLI_CONFIG)
finish(retcode, retdata)
###############################################################################
# > pvc cluster fault delete
###############################################################################
@click.command(
name="delete",
short_help="Delete a cluster fault.",
)
@click.argument("fault_id")
@connection_req
def cli_cluster_fault_delete(fault_id):
"""
Delete the cluster fault FAULT_ID.
"""
retcode, retdata = pvc.lib.faults.delete(CLI_CONFIG, fault_id)
finish(retcode, retdata)
###############################################################################
# > pvc cluster fault delete-all
###############################################################################
@click.command(
name="delete-all",
short_help="Delete all cluster faults.",
)
@confirm_opt("Delete all current cluster faults")
@connection_req
def cli_cluster_fault_delete_all():
"""
Delete all cluster faults.
"""
retcode, retdata = pvc.lib.faults.delete_all(CLI_CONFIG)
finish(retcode, retdata)
###############################################################################
# > pvc cluster maintenance
###############################################################################
@ -6170,10 +6288,16 @@ cli_provisioner_profile.add_command(cli_provisioner_profile_list)
cli_provisioner.add_command(cli_provisioner_profile)
cli_provisioner.add_command(cli_provisioner_create)
cli.add_command(cli_provisioner)
cli_cluster.add_command(cli_cluster_status)
cli_cluster.add_command(cli_cluster_init)
cli_cluster.add_command(cli_cluster_backup)
cli_cluster.add_command(cli_cluster_restore)
cli_cluster.add_command(cli_cluster_status)
cli_cluster_fault.add_command(cli_cluster_fault_list)
cli_cluster_fault.add_command(cli_cluster_fault_acknowledge)
cli_cluster_fault.add_command(cli_cluster_fault_acknowledge_all)
cli_cluster_fault.add_command(cli_cluster_fault_delete)
cli_cluster_fault.add_command(cli_cluster_fault_delete_all)
cli_cluster.add_command(cli_cluster_fault)
cli_cluster_maintenance.add_command(cli_cluster_maintenance_on)
cli_cluster_maintenance.add_command(cli_cluster_maintenance_off)
cli_cluster.add_command(cli_cluster_maintenance)

View File

@ -19,6 +19,7 @@
#
###############################################################################
from pvc.cli.helpers import MAX_CONTENT_WIDTH
from pvc.lib.node import format_info as node_format_info
from pvc.lib.node import format_list as node_format_list
from pvc.lib.vm import format_vm_tags as vm_format_tags
@ -96,6 +97,11 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
output.append(f"{ansii['bold']}PVC cluster status:{ansii['end']}")
output.append("")
output.append(f"{ansii['purple']}Primary node:{ansii['end']} {primary_node}")
output.append(f"{ansii['purple']}PVC version:{ansii['end']} {pvc_version}")
output.append(f"{ansii['purple']}Upstream IP:{ansii['end']} {upstream_ip}")
output.append("")
if health != "-1":
health = f"{health}%"
else:
@ -105,18 +111,33 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
health = f"{health} (maintenance on)"
output.append(
f"{ansii['purple']}Cluster health:{ansii['end']} {health_colour}{health}{ansii['end']}"
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
)
if messages is not None and len(messages) > 0:
messages = "\n ".join(sorted(messages))
output.append(f"{ansii['purple']}Health messages:{ansii['end']} {messages}")
message_list = list()
for message in messages:
if message["health_delta"] >= 50:
message_colour = ansii["red"]
elif message["health_delta"] >= 10:
message_colour = ansii["yellow"]
else:
message_colour = ansii["green"]
message_delta = (
f"({message_colour}-{message['health_delta']}%{ansii['end']})"
)
message_list.append(
# 15 length due to ANSI colour sequences
"{id} {delta:<15} {text}".format(
id=message["id"],
delta=message_delta,
text=message["text"],
)
)
output.append("")
messages = "\n ".join(message_list)
output.append(f"{ansii['purple']}Active Faults:{ansii['end']} {messages}")
output.append(f"{ansii['purple']}Primary node:{ansii['end']} {primary_node}")
output.append(f"{ansii['purple']}PVC version:{ansii['end']} {pvc_version}")
output.append(f"{ansii['purple']}Upstream IP:{ansii['end']} {upstream_ip}")
output.append("")
node_states = ["run,ready"]
@ -145,7 +166,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
nodes_string = ", ".join(nodes_strings)
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
vm_states = ["start", "disable"]
vm_states.extend(
@ -175,7 +196,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
vms_string = ", ".join(vms_strings)
output.append(f"{ansii['purple']}VMs:{ansii['end']} {vms_string}")
output.append(f"{ansii['purple']}VMs:{ansii['end']} {vms_string}")
osd_states = ["up,in"]
osd_states.extend(
@ -201,15 +222,15 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
osds_string = " ".join(osds_strings)
output.append(f"{ansii['purple']}OSDs:{ansii['end']} {osds_string}")
output.append(f"{ansii['purple']}OSDs:{ansii['end']} {osds_string}")
output.append(f"{ansii['purple']}Pools:{ansii['end']} {total_pools}")
output.append(f"{ansii['purple']}Pools:{ansii['end']} {total_pools}")
output.append(f"{ansii['purple']}Volumes:{ansii['end']} {total_volumes}")
output.append(f"{ansii['purple']}Volumes:{ansii['end']} {total_volumes}")
output.append(f"{ansii['purple']}Snapshots:{ansii['end']} {total_snapshots}")
output.append(f"{ansii['purple']}Snapshots:{ansii['end']} {total_snapshots}")
output.append(f"{ansii['purple']}Networks:{ansii['end']} {total_networks}")
output.append(f"{ansii['purple']}Networks:{ansii['end']} {total_networks}")
output.append("")
@ -249,18 +270,332 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
health = f"{health} (maintenance on)"
output.append(
f"{ansii['purple']}Cluster health:{ansii['end']} {health_colour}{health}{ansii['end']}"
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
)
if messages is not None and len(messages) > 0:
messages = "\n ".join(sorted(messages))
output.append(f"{ansii['purple']}Health messages:{ansii['end']} {messages}")
message_list = list()
for message in messages:
if message["health_delta"] >= 50:
message_colour = ansii["red"]
elif message["health_delta"] >= 10:
message_colour = ansii["yellow"]
else:
message_colour = ansii["green"]
message_delta = (
f"({message_colour}-{message['health_delta']}%{ansii['end']})"
)
message_list.append(
# 15 length due to ANSI colour sequences
"{id} {delta:<15} {text}".format(
id=message["id"],
delta=message_delta,
text=message["text"],
)
)
messages = "\n ".join(message_list)
output.append(f"{ansii['purple']}Active Faults:{ansii['end']} {messages}")
output.append("")
return "\n".join(output)
def cli_cluster_fault_list_format_short(CLI_CONFIG, fault_data):
"""
Short pretty format the output of cli_cluster_fault_list
"""
fault_list_output = []
# Determine optimal column widths
fault_id_length = 3 # "ID"
fault_status_length = 7 # "Status"
fault_last_reported_length = 14 # "Last Reported"
fault_health_delta_length = 7 # "Health"
fault_message_length = 8 # "Message"
for fault in fault_data:
# fault_id column
_fault_id_length = len(str(fault["id"])) + 1
if _fault_id_length > fault_id_length:
fault_id_length = _fault_id_length
# status column
_fault_status_length = len(str(fault["status"])) + 1
if _fault_status_length > fault_status_length:
fault_status_length = _fault_status_length
# health_delta column
_fault_health_delta_length = len(str(fault["health_delta"])) + 1
if _fault_health_delta_length > fault_health_delta_length:
fault_health_delta_length = _fault_health_delta_length
# last_reported column
_fault_last_reported_length = len(str(fault["last_reported"])) + 1
if _fault_last_reported_length > fault_last_reported_length:
fault_last_reported_length = _fault_last_reported_length
message_prefix_len = (
fault_id_length
+ 1
+ fault_status_length
+ 1
+ fault_health_delta_length
+ 1
+ fault_last_reported_length
+ 1
)
message_length = MAX_CONTENT_WIDTH - message_prefix_len
if fault_message_length > message_length:
fault_message_length = message_length + 1
# Handle splitting fault messages into separate lines based on width
formatted_messages = dict()
for fault in fault_data:
split_message = list()
if len(fault["message"]) > message_length:
words = fault["message"].split()
current_line = words[0]
for word in words[1:]:
if len(current_line) + len(word) + 1 < message_length:
current_line = f"{current_line} {word}"
else:
split_message.append(current_line)
current_line = word
split_message.append(current_line)
for line in split_message:
# message column
_fault_message_length = len(line) + 1
if _fault_message_length > fault_message_length:
fault_message_length = _fault_message_length
message = f"\n{' ' * message_prefix_len}".join(split_message)
else:
message = fault["message"]
# message column
_fault_message_length = len(message) + 1
if _fault_message_length > fault_message_length:
fault_message_length = _fault_message_length
formatted_messages[fault["id"]] = message
meta_header_length = (
fault_id_length + fault_status_length + fault_health_delta_length + 2
)
detail_header_length = (
fault_health_delta_length
+ fault_status_length
+ fault_last_reported_length
+ fault_message_length
+ 3
- meta_header_length
+ 8
)
# Format the string (header)
fault_list_output.append(
"{bold}Meta {meta_dashes} Fault {detail_dashes}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
meta_dashes="-" * (meta_header_length - len("Meta ")),
detail_dashes="-" * (detail_header_length - len("Fault ")),
)
)
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {fault_status: <{fault_status_length}} {fault_health_delta: <{fault_health_delta_length}} {fault_last_reported: <{fault_last_reported_length}} {fault_message}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_last_reported_length=fault_last_reported_length,
fault_id="ID",
fault_status="Status",
fault_health_delta="Health",
fault_last_reported="Last Reported",
fault_message="Message",
)
)
for fault in sorted(
fault_data,
key=lambda x: (x["health_delta"], x["last_reported"]),
reverse=True,
):
health_delta = fault["health_delta"]
if fault["acknowledged_at"] != "":
health_colour = ansii["blue"]
elif health_delta >= 50:
health_colour = ansii["red"]
elif health_delta >= 10:
health_colour = ansii["yellow"]
else:
health_colour = ansii["green"]
if len(fault["message"]) > message_length:
words = fault["message"].split()
split_message = list()
current_line = words[0]
for word in words:
if len(current_line) + len(word) + 1 < message_length:
current_line = f"{current_line} {word}"
else:
split_message.append(current_line)
current_line = word
split_message.append(current_line)
message = f"\n{' ' * message_prefix_len}".join(split_message)
else:
message = fault["message"]
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {fault_status: <{fault_status_length}} {health_colour}{fault_health_delta: <{fault_health_delta_length}}{end_colour} {fault_last_reported: <{fault_last_reported_length}} {fault_message}{end_bold}".format(
bold="",
end_bold="",
health_colour=health_colour,
end_colour=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_last_reported_length=fault_last_reported_length,
fault_id=fault["id"],
fault_status=fault["status"],
fault_health_delta=f"-{fault['health_delta']}%",
fault_last_reported=fault["last_reported"],
fault_message=formatted_messages[fault["id"]],
)
)
return "\n".join(fault_list_output)
def cli_cluster_fault_list_format_long(CLI_CONFIG, fault_data):
"""
Pretty format the output of cli_cluster_fault_list
"""
fault_list_output = []
# Determine optimal column widths
fault_id_length = 3 # "ID"
fault_status_length = 7 # "Status"
fault_health_delta_length = 7 # "Health"
fault_acknowledged_at_length = 9 # "Ack'd On"
fault_last_reported_length = 14 # "Last Reported"
fault_first_reported_length = 15 # "First Reported"
# Message goes on its own line
for fault in fault_data:
# fault_id column
_fault_id_length = len(str(fault["id"])) + 1
if _fault_id_length > fault_id_length:
fault_id_length = _fault_id_length
# status column
_fault_status_length = len(str(fault["status"])) + 1
if _fault_status_length > fault_status_length:
fault_status_length = _fault_status_length
# health_delta column
_fault_health_delta_length = len(str(fault["health_delta"])) + 1
if _fault_health_delta_length > fault_health_delta_length:
fault_health_delta_length = _fault_health_delta_length
# acknowledged_at column
_fault_acknowledged_at_length = len(str(fault["acknowledged_at"])) + 1
if _fault_acknowledged_at_length > fault_acknowledged_at_length:
fault_acknowledged_at_length = _fault_acknowledged_at_length
# last_reported column
_fault_last_reported_length = len(str(fault["last_reported"])) + 1
if _fault_last_reported_length > fault_last_reported_length:
fault_last_reported_length = _fault_last_reported_length
# first_reported column
_fault_first_reported_length = len(str(fault["first_reported"])) + 1
if _fault_first_reported_length > fault_first_reported_length:
fault_first_reported_length = _fault_first_reported_length
# Format the string (header)
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {fault_status: <{fault_status_length}} {fault_health_delta: <{fault_health_delta_length}} {fault_acknowledged_at: <{fault_acknowledged_at_length}} {fault_last_reported: <{fault_last_reported_length}} {fault_first_reported: <{fault_first_reported_length}}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_acknowledged_at_length=fault_acknowledged_at_length,
fault_last_reported_length=fault_last_reported_length,
fault_first_reported_length=fault_first_reported_length,
fault_id="ID",
fault_status="Status",
fault_health_delta="Health",
fault_acknowledged_at="Ack'd On",
fault_last_reported="Last Reported",
fault_first_reported="First Reported",
)
)
fault_list_output.append(
"{bold}> {fault_message}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
fault_message="Message",
)
)
for fault in sorted(
fault_data,
key=lambda x: (x["status"], x["health_delta"], x["last_reported"]),
reverse=True,
):
health_delta = fault["health_delta"]
if fault["acknowledged_at"] != "":
health_colour = ansii["blue"]
elif health_delta >= 50:
health_colour = ansii["red"]
elif health_delta >= 10:
health_colour = ansii["yellow"]
else:
health_colour = ansii["green"]
fault_list_output.append("")
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {health_colour}{fault_status: <{fault_status_length}} {fault_health_delta: <{fault_health_delta_length}}{end_colour} {fault_acknowledged_at: <{fault_acknowledged_at_length}} {fault_last_reported: <{fault_last_reported_length}} {fault_first_reported: <{fault_first_reported_length}}{end_bold}".format(
bold="",
end_bold="",
health_colour=health_colour,
end_colour=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_acknowledged_at_length=fault_acknowledged_at_length,
fault_last_reported_length=fault_last_reported_length,
fault_first_reported_length=fault_first_reported_length,
fault_id=fault["id"],
fault_status=fault["status"].title(),
fault_health_delta=f"-{fault['health_delta']}%",
fault_acknowledged_at=fault["acknowledged_at"]
if fault["acknowledged_at"] != ""
else "N/A",
fault_last_reported=fault["last_reported"],
fault_first_reported=fault["first_reported"],
)
)
fault_list_output.append(
"> {fault_message}".format(
fault_message=fault["message"],
)
)
return "\n".join(fault_list_output)
def cli_cluster_task_format_pretty(CLI_CONFIG, task_data):
"""
Pretty format the output of cli_cluster_task

View File

@ -26,7 +26,7 @@ from distutils.util import strtobool
from getpass import getuser
from json import load as jload
from json import dump as jdump
from os import chmod, environ, getpid, path, makedirs
from os import chmod, environ, getpid, path, makedirs, get_terminal_size
from re import findall
from socket import gethostname
from subprocess import run, PIPE
@ -45,7 +45,13 @@ DEFAULT_STORE_FILENAME = "pvc.json"
DEFAULT_API_PREFIX = "/api/v1"
DEFAULT_NODE_HOSTNAME = gethostname().split(".")[0]
DEFAULT_AUTOBACKUP_FILENAME = "/etc/pvc/pvc.conf"
MAX_CONTENT_WIDTH = 120
try:
# Define the content width to be the maximum terminal size
MAX_CONTENT_WIDTH = get_terminal_size().columns - 1
except OSError:
# Fall back to 80 columns if "Inappropriate ioctl for device"
MAX_CONTENT_WIDTH = 80
def echo(config, message, newline=True, stderr=False):

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python3
# faults.py - PVC CLI client function library, faults management
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from pvc.lib.common import call_api
def get_list(config, limit=None, sort_key="last_reported"):
"""
Get list of PVC faults
API endpoint: GET /api/v1/faults
API arguments: sort_key={sort_key}
API schema: {json_data_object}
"""
if limit is not None:
params = {}
endpoint = f"/faults/{limit}"
else:
params = {"sort_key": sort_key}
endpoint = "/faults"
response = call_api(config, "get", endpoint, params=params)
if response.status_code == 200:
return True, response.json()
else:
return False, response.json().get("message", "")
def acknowledge(config, fault_id):
"""
Acknowledge a PVC fault
API endpoint: PUT /api/v1/faults/<fault_id>
API arguments:
API schema: {json_message}
"""
response = call_api(config, "put", f"/faults/{fault_id}")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")
def acknowledge_all(config):
"""
Acknowledge all PVC faults
API endpoint: PUT /api/v1/faults
API arguments:
API schema: {json_message}
"""
response = call_api(config, "put", "/faults")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")
def delete(config, fault_id):
"""
Delete a PVC fault
API endpoint: DELETE /api/v1/faults/<fault_id>
API arguments:
API schema: {json_message}
"""
response = call_api(config, "delete", f"/faults/{fault_id}")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")
def delete_all(config):
"""
Delete all PVC faults
API endpoint: DELETE /api/v1/faults
API arguments:
API schema: {json_message}
"""
response = call_api(config, "delete", "/faults")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")

View File

@ -279,7 +279,7 @@ def unset_osd(zkhandler, option):
return True, 'Unset OSD property "{}".'.format(option)
def get_list_osd(zkhandler, limit, is_fuzzy=True):
def get_list_osd(zkhandler, limit=None, is_fuzzy=True):
osd_list = []
full_osd_list = zkhandler.children("base.osd")
@ -472,7 +472,7 @@ def set_pgs_pool(zkhandler, name, pgs):
return True, f'Set PGs count to {pgs} for RBD pool "{name}".'
def get_list_pool(zkhandler, limit, is_fuzzy=True):
def get_list_pool(zkhandler, limit=None, is_fuzzy=True):
full_pool_list = zkhandler.children("base.pool")
if is_fuzzy and limit:
@ -830,7 +830,7 @@ def unmap_volume(zkhandler, pool, name):
return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume)
def get_list_volume(zkhandler, pool, limit, is_fuzzy=True):
def get_list_volume(zkhandler, pool, limit=None, is_fuzzy=True):
if pool and not verifyPool(zkhandler, pool):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
pool
@ -1034,7 +1034,7 @@ def remove_snapshot(zkhandler, pool, volume, name):
)
def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True):
def get_list_snapshot(zkhandler, pool, volume, limit=None, is_fuzzy=True):
snapshot_list = []
if pool and not verifyPool(zkhandler, pool):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(

View File

@ -22,6 +22,7 @@
from json import loads
import daemon_lib.common as common
import daemon_lib.faults as faults
import daemon_lib.vm as pvc_vm
import daemon_lib.node as pvc_node
import daemon_lib.network as pvc_network
@ -44,6 +45,39 @@ def set_maintenance(zkhandler, maint_state):
return True, "Successfully set cluster in normal mode"
def getClusterHealthFromFaults(zkhandler):
faults_list = faults.getAllFaults(zkhandler)
unacknowledged_faults = [fault for fault in faults_list if fault["status"] != "ack"]
# Generate total cluster health numbers
cluster_health_value = 100
cluster_health_messages = list()
for fault in sorted(
unacknowledged_faults,
key=lambda x: (x["health_delta"], x["last_reported"]),
reverse=True,
):
cluster_health_value -= fault["health_delta"]
message = {
"id": fault["id"],
"health_delta": fault["health_delta"],
"text": fault["message"],
}
cluster_health_messages.append(message)
if cluster_health_value < 0:
cluster_health_value = 0
cluster_health = {
"health": cluster_health_value,
"messages": cluster_health_messages,
}
return cluster_health
def getClusterHealth(zkhandler, node_list, vm_list, ceph_osd_list):
health_delta_map = {
"node_stopped": 50,
@ -240,51 +274,9 @@ def getClusterInformation(zkhandler):
ceph_volume_count = len(ceph_volume_list)
ceph_snapshot_count = len(ceph_snapshot_list)
# State lists
node_state_combinations = [
"run,ready",
"run,flush",
"run,flushed",
"run,unflush",
"init,ready",
"init,flush",
"init,flushed",
"init,unflush",
"stop,ready",
"stop,flush",
"stop,flushed",
"stop,unflush",
"dead,ready",
"dead,flush",
"dead,fence-flush",
"dead,flushed",
"dead,unflush",
"fenced,ready",
"fenced,flush",
"fenced,flushed",
"fenced,unflush",
]
vm_state_combinations = [
"start",
"restart",
"shutdown",
"stop",
"disable",
"fail",
"migrate",
"unmigrate",
"provision",
]
ceph_osd_state_combinations = [
"up,in",
"up,out",
"down,in",
"down,out",
]
# Format the Node states
formatted_node_states = {"total": node_count}
for state in node_state_combinations:
for state in common.node_state_combinations:
state_count = 0
for node in node_list:
node_state = f"{node['daemon_state']},{node['domain_state']}"
@ -295,7 +287,7 @@ def getClusterInformation(zkhandler):
# Format the VM states
formatted_vm_states = {"total": vm_count}
for state in vm_state_combinations:
for state in common.vm_state_combinations:
state_count = 0
for vm in vm_list:
if vm["state"] == state:
@ -307,7 +299,7 @@ def getClusterInformation(zkhandler):
up_texts = {1: "up", 0: "down"}
in_texts = {1: "in", 0: "out"}
formatted_osd_states = {"total": ceph_osd_count}
for state in ceph_osd_state_combinations:
for state in common.ceph_osd_state_combinations:
state_count = 0
for ceph_osd in ceph_osd_list:
ceph_osd_state = f"{up_texts[ceph_osd['stats']['up']]},{in_texts[ceph_osd['stats']['in']]}"
@ -318,9 +310,7 @@ def getClusterInformation(zkhandler):
# Format the status data
cluster_information = {
"cluster_health": getClusterHealth(
zkhandler, node_list, vm_list, ceph_osd_list
),
"cluster_health": getClusterHealthFromFaults(zkhandler),
"node_health": getNodeHealth(zkhandler, node_list),
"maintenance": maintenance_state,
"primary_node": primary_node,

View File

@ -34,6 +34,58 @@ from shlex import split as shlex_split
from functools import wraps
###############################################################################
# Global Variables
###############################################################################
# State lists
fault_state_combinations = [
"new",
"ack",
]
node_state_combinations = [
"run,ready",
"run,flush",
"run,flushed",
"run,unflush",
"init,ready",
"init,flush",
"init,flushed",
"init,unflush",
"stop,ready",
"stop,flush",
"stop,flushed",
"stop,unflush",
"dead,ready",
"dead,flush",
"dead,fence-flush",
"dead,flushed",
"dead,unflush",
"fenced,ready",
"fenced,flush",
"fenced,flushed",
"fenced,unflush",
]
vm_state_combinations = [
"start",
"restart",
"shutdown",
"stop",
"disable",
"fail",
"migrate",
"unmigrate",
"provision",
]
ceph_osd_state_combinations = [
"up,in",
"up,out",
"down,in",
"down,out",
]
###############################################################################
# Performance Profiler decorator
###############################################################################

View File

@ -284,7 +284,7 @@ def get_parsed_configuration(config_file):
config_timer = {
"vm_shutdown_timeout": int(o_timer.get("vm_shutdown_timeout", 180)),
"keepalive_interval": int(o_timer.get("keepalive_interval", 5)),
"monitoring_interval": int(o_timer.get("monitoring_interval", 60)),
"monitoring_interval": int(o_timer.get("monitoring_interval", 15)),
}
config = {**config, **config_timer}

221
daemon-common/faults.py Normal file
View File

@ -0,0 +1,221 @@
#!/usr/bin/env python3
# faults.py - PVC client function library, faults management
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from datetime import datetime
from hashlib import md5
def generate_fault(
zkhandler,
logger,
fault_name,
fault_time,
fault_delta,
fault_message,
fault_details=None,
):
# Generate a fault ID from the fault_name, fault_delta, and fault_message
fault_str = f"{fault_name} {fault_delta} {fault_message}"
fault_id = str(md5(fault_str.encode("utf-8")).hexdigest())[:8]
# Strip the microseconds off of the fault time; we don't care about that precision
fault_time = str(fault_time).split(".")[0]
if fault_details is not None:
fault_message = f"{fault_message}: {fault_details}"
# If a fault already exists with this ID, just update the time
if not zkhandler.exists("base.faults"):
logger.out(
f"Skipping fault reporting for {fault_id} due to missing Zookeeper schemas",
state="w",
)
return
existing_faults = zkhandler.children("base.faults")
if fault_id in existing_faults:
logger.out(
f"Updating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
)
else:
logger.out(
f"Generating fault {fault_id}: {fault_message} @ {fault_time}",
state="i",
)
if zkhandler.read("base.config.maintenance") == "true":
logger.out(
f"Skipping fault reporting for {fault_id} due to maintenance mode",
state="w",
)
return
if fault_id in existing_faults:
zkhandler.write(
[
(("faults.last_time", fault_id), fault_time),
(("faults.message", fault_id), fault_message),
]
)
# Otherwise, generate a new fault event
else:
zkhandler.write(
[
(("faults.id", fault_id), ""),
(("faults.first_time", fault_id), fault_time),
(("faults.last_time", fault_id), fault_time),
(("faults.ack_time", fault_id), ""),
(("faults.status", fault_id), "new"),
(("faults.delta", fault_id), fault_delta),
(("faults.message", fault_id), fault_message),
]
)
def getFault(zkhandler, fault_id):
"""
Get the details of a fault based on the fault ID
"""
if not zkhandler.exists(("faults.id", fault_id)):
return None
fault_id = fault_id
fault_last_time = zkhandler.read(("faults.last_time", fault_id))
fault_first_time = zkhandler.read(("faults.first_time", fault_id))
fault_ack_time = zkhandler.read(("faults.ack_time", fault_id))
fault_status = zkhandler.read(("faults.status", fault_id))
fault_delta = int(zkhandler.read(("faults.delta", fault_id)))
fault_message = zkhandler.read(("faults.message", fault_id))
# Acknowledged faults have a delta of 0
if fault_ack_time != "":
fault_delta = 0
fault = {
"id": fault_id,
"last_reported": fault_last_time,
"first_reported": fault_first_time,
"acknowledged_at": fault_ack_time,
"status": fault_status,
"health_delta": fault_delta,
"message": fault_message,
}
return fault
def getAllFaults(zkhandler, sort_key="last_reported"):
"""
Get the details of all registered faults
"""
all_faults = zkhandler.children(("base.faults"))
faults_detail = list()
for fault_id in all_faults:
fault_detail = getFault(zkhandler, fault_id)
faults_detail.append(fault_detail)
sorted_faults = sorted(faults_detail, key=lambda x: x[sort_key])
# Sort newest-first for time-based sorts
if sort_key in ["first_reported", "last_reported", "acknowledge_at"]:
sorted_faults.reverse()
return sorted_faults
def get_list(zkhandler, limit=None, sort_key="last_reported"):
"""
Get a list of all known faults, sorted by {sort_key}
"""
if sort_key not in [
"first_reported",
"last_reported",
"acknowledged_at",
"status",
"health_delta",
"message",
]:
return False, f"Invalid sort key {sort_key} provided"
all_faults = getAllFaults(zkhandler, sort_key=sort_key)
if limit is not None:
all_faults = [fault for fault in all_faults if fault["id"] == limit]
return True, all_faults
def acknowledge(zkhandler, fault_id=None):
"""
Acknowledge a fault or all faults
"""
if fault_id is None:
faults = getAllFaults(zkhandler)
else:
fault = getFault(zkhandler, fault_id)
if fault is None:
return False, f"No fault with ID {fault_id} found"
faults = [fault]
for fault in faults:
# Don't reacknowledge already-acknowledged faults
if fault["status"] != "ack":
zkhandler.write(
[
(
("faults.ack_time", fault["id"]),
str(datetime.now()).split(".")[0],
),
(("faults.status", fault["id"]), "ack"),
]
)
return (
True,
f"Successfully acknowledged fault(s) {', '.join([fault['id'] for fault in faults])}",
)
def delete(zkhandler, fault_id=None):
"""
Delete a fault or all faults
"""
if fault_id is None:
faults = getAllFaults(zkhandler)
else:
fault = getFault(zkhandler, fault_id)
if fault is None:
return False, f"No fault with ID {fault_id} found"
faults = [fault]
for fault in faults:
zkhandler.delete(("faults.id", fault["id"]), recursive=True)
return (
True,
f"Successfully deleted fault(s) {', '.join([fault['id'] for fault in faults])}",
)

View File

@ -0,0 +1 @@
{"version": "11", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "logs": "/logs", "faults": "/faults", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.health": "/ceph/health", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "faults": {"id": "", "last_time": "/last_time", "first_time": "/first_time", "ack_time": "/ack_time", "status": "/status", "delta": "/delta", "message": "/message"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", "monitoring.plugins": "/monitoring_plugins", "monitoring.data": "/monitoring_data", "monitoring.health": "/monitoring_health"}, "monitoring_plugin": {"name": "", "last_run": "/last_run", "health_delta": "/health_delta", "message": "/message", "data": "/data", "runtime": "/runtime"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "is_split": "/is_split", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -261,7 +261,7 @@ def get_info(zkhandler, node):
def get_list(
zkhandler,
limit,
limit=None,
daemon_state=None,
coordinator_state=None,
domain_state=None,

View File

@ -167,7 +167,7 @@ def open_db(config):
conn = psycopg2.connect(
host=config["api_postgresql_host"],
port=config["api_postgresql_port"],
dbname=config["api_postgresql_name"],
dbname=config["api_postgresql_dbname"],
user=config["api_postgresql_user"],
password=config["api_postgresql_password"],
)
@ -335,7 +335,7 @@ def worker_create_vm(
monitor_list.append("{}.{}".format(monitor, config["storage_domain"]))
vm_data["ceph_monitor_list"] = monitor_list
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]
vm_data["ceph_monitor_secret"] = config["ceph_secret_uuid"]
# Parse the script arguments
script_arguments = dict()

View File

@ -335,11 +335,11 @@ class ZKHandler(object):
try:
path = self.get_schema_path(key)
if path is None:
# This path is invalid; this is likely due to missing schema entries, so return None
return None
raise NoNodeError
return self.zk_conn.get_children(path)
except NoNodeError:
# This path is invalid; this is likely due to missing schema entries, so return None
return None
def rename(self, kkpairs):
@ -540,7 +540,7 @@ class ZKHandler(object):
#
class ZKSchema(object):
# Current version
_version = 10
_version = 11
# Root for doing nested keys
_schema_root = ""
@ -560,7 +560,8 @@ class ZKSchema(object):
"config.primary_node.sync_lock": f"{_schema_root}/config/primary_node/sync_lock",
"config.upstream_ip": f"{_schema_root}/config/upstream_ip",
"config.migration_target_selector": f"{_schema_root}/config/migration_target_selector",
"logs": "/logs",
"logs": f"{_schema_root}/logs",
"faults": f"{_schema_root}/faults",
"node": f"{_schema_root}/nodes",
"domain": f"{_schema_root}/domains",
"network": f"{_schema_root}/networks",
@ -577,6 +578,16 @@ class ZKSchema(object):
"node": "", # The root key
"messages": "/messages",
},
# The schema of an individual logs entry (/logs/{id})
"faults": {
"id": "", # The root key
"last_time": "/last_time",
"first_time": "/first_time",
"ack_time": "/ack_time",
"status": "/status",
"delta": "/delta",
"message": "/message",
},
# The schema of an individual node entry (/nodes/{node_name})
"node": {
"name": "", # The root key
@ -619,7 +630,11 @@ class ZKSchema(object):
"runtime": "/runtime",
},
# The schema of an individual SR-IOV PF entry (/nodes/{node_name}/sriov/pf/{pf})
"sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, # The root key
"sriov_pf": {
"phy": "",
"mtu": "/mtu",
"vfcount": "/vfcount",
}, # The root key
# The schema of an individual SR-IOV VF entry (/nodes/{node_name}/sriov/vf/{vf})
"sriov_vf": {
"phy": "", # The root key
@ -665,7 +680,11 @@ class ZKSchema(object):
"migrate.sync_lock": "/migrate_sync_lock",
},
# The schema of an individual domain tag entry (/domains/{domain}/tags/{tag})
"tag": {"name": "", "type": "/type", "protected": "/protected"}, # The root key
"tag": {
"name": "",
"type": "/type",
"protected": "/protected",
}, # The root key
# The schema of an individual network entry (/networks/{vni})
"network": {
"vni": "", # The root key
@ -702,7 +721,11 @@ class ZKSchema(object):
"client_id": "/clientid",
},
# The schema for an individual network ACL entry (/networks/{vni}/firewall_rules/(in|out)/{acl}
"rule": {"description": "", "rule": "/rule", "order": "/order"}, # The root key
"rule": {
"description": "",
"rule": "/rule",
"order": "/order",
}, # The root key
# The schema of an individual OSD entry (/ceph/osds/{osd_id})
"osd": {
"id": "", # The root key
@ -726,9 +749,15 @@ class ZKSchema(object):
"stats": "/stats",
}, # The root key
# The schema of an individual volume entry (/ceph/volumes/{pool_name}/{volume_name})
"volume": {"name": "", "stats": "/stats"}, # The root key
"volume": {
"name": "",
"stats": "/stats",
}, # The root key
# The schema of an individual snapshot entry (/ceph/volumes/{pool_name}/{volume_name}/{snapshot_name})
"snapshot": {"name": "", "stats": "/stats"}, # The root key
"snapshot": {
"name": "",
"stats": "/stats",
}, # The root key
}
# Properties

8
debian/control vendored
View File

@ -8,7 +8,7 @@ X-Python3-Version: >= 3.7
Package: pvc-daemon-node
Architecture: all
Depends: systemd, pvc-daemon-common, pvc-daemon-health, pvc-daemon-worker, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Depends: systemd, pvc-daemon-common, pvc-daemon-health, pvc-daemon-worker, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, python3-prometheus-client, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Description: Parallel Virtual Cluster node daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
@ -16,7 +16,7 @@ Description: Parallel Virtual Cluster node daemon
Package: pvc-daemon-health
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml, python3-prometheus-client
Description: Parallel Virtual Cluster health daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
@ -24,7 +24,7 @@ Description: Parallel Virtual Cluster health daemon
Package: pvc-daemon-worker
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python-celery-common, fio
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-celery, python3-redis, python3-yaml, python3-prometheus-client, python-celery-common, fio
Description: Parallel Virtual Cluster worker daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
@ -32,7 +32,7 @@ Description: Parallel Virtual Cluster worker daemon
Package: pvc-daemon-api
Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, python3-prometheus-client
Description: Parallel Virtual Cluster API daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.

View File

@ -55,7 +55,8 @@ class MonitoringPluginScript(MonitoringPlugin):
This step is optional and should be used sparingly.
"""
pass
# Prepare the last coordinator state
self.last_coordinator_state = None
def run(self, coordinator_state=None):
"""
@ -66,6 +67,8 @@ class MonitoringPluginScript(MonitoringPlugin):
# Run any imports first
from psycopg2 import connect
from json import loads as jloads
from daemon_lib.common import run_os_command
conn_api = None
cur_api = None
@ -77,7 +80,7 @@ class MonitoringPluginScript(MonitoringPlugin):
# Craft a message that can be used by the clients
message = "Successfully connected to PostgreSQL databases on localhost"
# Check the Metadata database (primary)
# Check the API database
try:
conn_api = connect(
host=self.this_node.name,
@ -99,34 +102,38 @@ class MonitoringPluginScript(MonitoringPlugin):
if conn_api is not None:
conn_api.close()
if health_delta == 0:
# Check the PowerDNS database (secondary)
try:
conn_pdns = connect(
host=self.this_node.name,
port=self.config["pdns_postgresql_port"],
dbname=self.config["pdns_postgresql_dbname"],
user=self.config["pdns_postgresql_user"],
password=self.config["pdns_postgresql_password"],
)
cur_pdns = conn_pdns.cursor()
cur_pdns.execute("""SELECT * FROM supermasters""")
data = cur_pdns.fetchone()
except Exception as e:
health_delta = 50
err = str(e).split('\n')[0]
message = f"Failed to connect to PostgreSQL database {self.config['pdns_postgresql_dbname']}: {err}"
finally:
if cur_pdns is not None:
cur_pdns.close()
if conn_pdns is not None:
conn_pdns.close()
# Check for Patroni status
_, stdout, _ = run_os_command("patronictl --config-file /etc/patroni/config.yml list --format json")
patronictl_status = jloads(stdout)
this_node_patronictl_status = [p for p in patronictl_status if p["Member"] == self.this_node.name][0]
self.logger.out(f"{this_node_patronictl_status}, last node state: {self.last_coordinator_state}, current node state: {coordinator_state}", state="d")
# Invalid state, nothing returned; this is a fault
if health_delta == 0 and not this_node_patronictl_status:
health_delta = 10
message = "Unable to determine Patroni PostgreSQL node state"
# We want to check for a non-running Patroni, but not during or immediately after a coordinator
# transition. So we wait until 2 runs with the same coordinator state have been completed.
elif health_delta == 0 and self.last_coordinator_state == coordinator_state and this_node_patronictl_status["State"] != "running":
health_delta = 10
message = "Patroni PostgreSQL state is not running"
# Handle some exceptional cases
if health_delta > 0:
if coordinator_state in ["takeover", "relinquish"]:
# This scenario occurrs if this plugin run catches a node transitioning from primary to
# secondary coordinator. We can ignore it.
health_delta = 0
message = "Patroni PostgreSQL error reported but currently transitioning coordinator state; ignoring."
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(message)
# Update the last coordinator state
self.last_coordinator_state = coordinator_state
# Return our local PluginResult object
return self.plugin_result

View File

@ -25,9 +25,11 @@ import importlib.util
from os import walk
from datetime import datetime
from json import dumps
from json import dumps, loads
from apscheduler.schedulers.background import BackgroundScheduler
from daemon_lib.faults import generate_fault
class PluginError(Exception):
"""
@ -196,6 +198,125 @@ class MonitoringInstance(object):
self.config = config
self.logger = logger
self.this_node = this_node
self.faults = 0
# Create functions for each fault type
def get_node_daemon_states():
node_daemon_states = [
{
"entry": node,
"check": self.zkhandler.read(("node.state.daemon", node)),
"details": None,
}
for node in self.zkhandler.children("base.node")
]
return node_daemon_states
def get_osd_in_states():
osd_in_states = [
{
"entry": osd,
"check": loads(self.zkhandler.read(("osd.stats", osd))).get(
"in", 0
),
"details": None,
}
for osd in self.zkhandler.children("base.osd")
]
return osd_in_states
def get_ceph_health_entries():
ceph_health_entries = [
{
"entry": f"{value['severity']} {key}",
"check": value["severity"],
"details": value["summary"]["message"],
}
for key, value in loads(zkhandler.read("base.storage.health"))[
"checks"
].items()
]
return ceph_health_entries
def get_vm_states():
vm_states = [
{
"entry": self.zkhandler.read(("domain.name", domain)),
"check": self.zkhandler.read(("domain.state", domain)),
"details": self.zkhandler.read(("domain.failed_reason", domain)),
}
for domain in self.zkhandler.children("base.domain")
]
return vm_states
def get_overprovisioned_memory():
all_nodes = self.zkhandler.children("base.node")
current_memory_provisioned = sum(
[
int(self.zkhandler.read(("node.memory.allocated", node)))
for node in all_nodes
]
)
node_memory_totals = [
int(self.zkhandler.read(("node.memory.total", node)))
for node in all_nodes
]
total_node_memory = sum(node_memory_totals)
most_node_memory = sorted(node_memory_totals)[-1]
available_node_memory = total_node_memory - most_node_memory
if current_memory_provisioned >= available_node_memory:
op_str = "overprovisioned"
else:
op_str = "ok"
overprovisioned_memory = [
{
"entry": "Cluster memory was overprovisioned",
"check": op_str,
"details": f"{current_memory_provisioned}MB > {available_node_memory}MB (N-1)",
}
]
return overprovisioned_memory
# This is a list of all possible faults (cluster error messages) and their corresponding details
self.cluster_faults_map = {
"dead_or_fenced_node": {
"entries": get_node_daemon_states,
"conditions": ["dead", "fenced"],
"delta": 50,
"message": "Node {entry} was dead and/or fenced",
},
"ceph_osd_out": {
"entries": get_osd_in_states,
"conditions": ["0"],
"delta": 50,
"message": "OSD {entry} was marked out",
},
"ceph_warn": {
"entries": get_ceph_health_entries,
"conditions": ["HEALTH_WARN"],
"delta": 10,
"message": "{entry} reported by Ceph cluster",
},
"ceph_err": {
"entries": get_ceph_health_entries,
"conditions": ["HEALTH_ERR"],
"delta": 50,
"message": "{entry} reported by Ceph cluster",
},
"vm_failed": {
"entries": get_vm_states,
"conditions": ["fail"],
"delta": 10,
"message": "VM {entry} was failed",
},
"memory_overprovisioned": {
"entries": get_overprovisioned_memory,
"conditions": ["overprovisioned"],
"delta": 50,
"message": "{entry}",
},
}
# Get a list of plugins from the plugin_directory
plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[
@ -344,38 +465,84 @@ class MonitoringInstance(object):
)
)
self.run_plugins()
self.start_check_timer()
self.start_timer()
def __del__(self):
self.shutdown()
def shutdown(self):
self.stop_check_timer()
self.stop_timer()
self.run_cleanups()
return
def start_check_timer(self):
check_interval = self.config["monitoring_interval"]
def start_timer(self):
check_interval = int(self.config["monitoring_interval"])
self.timer = BackgroundScheduler()
self.timer.add_job(
self.run_checks,
trigger="interval",
seconds=check_interval,
)
self.logger.out(
f"Starting monitoring check timer ({check_interval} second interval)",
state="s",
)
self.check_timer = BackgroundScheduler()
self.check_timer.add_job(
self.run_plugins,
trigger="interval",
seconds=check_interval,
)
self.check_timer.start()
self.timer.start()
def stop_check_timer(self):
self.run_checks()
def stop_timer(self):
try:
self.check_timer.shutdown()
self.logger.out("Stopping monitoring check timer", state="s")
self.timer.shutdown()
except Exception:
self.logger.out("Failed to stop monitoring check timer", state="w")
def run_faults(self, coordinator_state=None):
self.logger.out(
f"Starting cluster fault check run at {datetime.now()}",
state="t",
)
for fault_type in self.cluster_faults_map.keys():
fault_data = self.cluster_faults_map[fault_type]
if self.config["log_monitoring_details"] or self.config["debug"]:
self.logger.out(
f"Running fault check {fault_type}",
state="t",
)
entries = fault_data["entries"]()
if self.config["debug"]:
self.logger.out(
f"Entries for fault check {fault_type}: {dumps(entries)}",
state="d",
)
for _entry in entries:
entry = _entry["entry"]
check = _entry["check"]
details = _entry["details"]
for condition in fault_data["conditions"]:
if str(condition) == str(check):
fault_time = datetime.now()
fault_delta = fault_data["delta"]
fault_message = fault_data["message"].format(entry=entry)
generate_fault(
self.zkhandler,
self.logger,
fault_type,
fault_time,
fault_delta,
fault_message,
fault_details=details,
)
self.faults += 1
def run_plugin(self, plugin):
time_start = datetime.now()
try:
@ -394,19 +561,9 @@ class MonitoringInstance(object):
result.to_zookeeper()
return result
def run_plugins(self):
if self.this_node.coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif self.this_node.coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now()
def run_plugins(self, coordinator_state=None):
self.logger.out(
"Starting monitoring healthcheck run",
f"Starting node plugin check run at {datetime.now()}",
state="t",
)
@ -427,7 +584,33 @@ class MonitoringInstance(object):
state="t",
prefix=f"{result.plugin_name} ({result.runtime}s)",
)
total_health -= result.health_delta
# Generate a cluster fault if the plugin is in a suboptimal state
if result.health_delta > 0:
fault_type = f"plugin.{self.this_node.name}.{result.plugin_name}"
fault_time = datetime.now()
# Map our check results to fault results
# These are not 1-to-1, as faults are cluster-wide.
# We divide the delta by two since 2 nodes with the same problem
# should equal what the result says.
fault_delta = int(result.health_delta / 2)
fault_message = (
f"{self.this_node.name} {result.plugin_name}: {result.message}"
)
generate_fault(
self.zkhandler,
self.logger,
fault_type,
fault_time,
fault_delta,
fault_message,
fault_detail=None,
)
self.faults += 1
total_health -= result.health_delta
if total_health < 0:
total_health = 0
@ -441,38 +624,6 @@ class MonitoringInstance(object):
]
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
time.sleep(0.2)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = str(self.this_node.health) + "%"
else:
health_colour = self.logger.fmt_blue
health_text = "N/A"
self.logger.out(
"{start_colour}{hostname} healthcheck @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
health_colour=health_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
health=health_text,
runtime=runtime,
),
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()
@ -494,3 +645,66 @@ class MonitoringInstance(object):
),
]
)
def run_checks(self):
self.faults = 0
runtime_start = datetime.now()
coordinator_state = self.this_node.coordinator_state
if coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
self.run_plugins(coordinator_state=coordinator_state)
if coordinator_state in ["primary", "secondary", "takeover", "relinquish"]:
self.run_faults(coordinator_state=coordinator_state)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
result_text = list()
if coordinator_state in ["primary", "secondary", "takeover", "relinquish"]:
if self.faults > 0:
fault_colour = self.logger.fmt_red
else:
fault_colour = self.logger.fmt_green
if self.faults != 1:
s = "s"
else:
s = ""
fault_text = f"{fault_colour}{self.faults}{self.logger.fmt_end} fault{s}"
result_text.append(fault_text)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = f"{health_colour}{self.this_node.health}%{self.logger.fmt_end} node health"
result_text.append(health_text)
else:
health_text = "{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
result_text.append(health_text)
self.logger.out(
"{start_colour}{hostname} health check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {result_text} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=coordinator_state,
runtime=runtime,
result_text=", ".join(result_text),
),
state="t",
)

View File

@ -216,7 +216,7 @@ timer:
keepalive_interval: 5
# Monitoring interval (seconds)
monitoring_interval: 60
monitoring_interval: 15
# Fencing configuration
fencing: