Compare commits

...

33 Commits

Author SHA1 Message Date
82a7fd3c80 Add more debugging info to psql 2023-12-07 21:36:05 -05:00
ddd9d9ee07 Adjust psql check to avoid weird failures 2023-12-07 15:07:59 -05:00
9e2e749c55 Combine pvchealthd output into single log message 2023-12-07 14:00:43 -05:00
157b8c20bf Add Patroni output to debug logs 2023-12-07 14:00:35 -05:00
bf158dc2d9 Shorten debug output 2023-12-07 13:31:20 -05:00
1b84553405 Use passed coordinator state 2023-12-07 11:19:26 -05:00
60dac143f2 Use simpler health calculation 2023-12-07 11:17:31 -05:00
a13273335d Add colon to result text 2023-12-07 11:15:42 -05:00
e7f21b7058 Enhance and fix bugs in psql plugin
1. Check Patronictl statuses
2. Don't error during node primary transitions
2023-12-07 11:14:16 -05:00
9dbadfdd6e Move back to per-plugin fault reporting 2023-12-07 11:13:56 -05:00
61b39d0739 Fix incorrect cluster health calculation 2023-12-07 11:13:36 -05:00
4bf80a5913 Fix missing datetime shrink 2023-12-06 17:15:36 -05:00
6c0dfe16cf Improve word splitting for fault messages
This ensures that fault messages are split on word boundaries and that
the column length is equal to the longest of these if applicable.
2023-12-06 17:10:19 -05:00
3fde494fc5 Add status back to short fault list 2023-12-06 16:53:23 -05:00
0945b3faf3 Use same fault formatting for short and long 2023-12-06 16:19:44 -05:00
1416f9edc0 Remove bad sort values 2023-12-06 14:38:29 -05:00
5691f75ac9 Fix bad import 2023-12-06 14:28:32 -05:00
e0bf7f7d1a Fix bad ID values in acknowledge 2023-12-06 14:18:31 -05:00
0c34c88a1f Fix bad dict key name 2023-12-06 14:16:19 -05:00
20acf3295f Add mass ack/delete of faults 2023-12-06 13:59:39 -05:00
4a02c2c8e3 Add additional faults 2023-12-06 13:27:39 -05:00
6fc5c927a1 Properly sort status faults 2023-12-06 13:27:18 -05:00
d1e34e7333 Store fault times only to the second
Any more precision is unnecessary and saves 6 chars when displaying
these times elsewhere.
2023-12-06 13:20:18 -05:00
79eb54d5da Move fault generation to common library 2023-12-06 13:17:10 -05:00
536fb2080f Fix get_terminal_size over SSH 2023-12-06 13:11:28 -05:00
2267a9c85d Improve output formatting for simplicity 2023-12-05 10:37:35 -05:00
067e73337f Shorten health IDs to 8 characters 2023-12-04 15:48:27 -05:00
672e58133f Implement interfaces to faults 2023-12-04 01:37:54 -05:00
b59f743690 Improve logging and handling of fault entries 2023-12-01 17:38:28 -05:00
4c3f235e05 Avoid running fault updates in maintenance mode
When the cluster is in maintenance mode, all faults should be ignored.
2023-12-01 17:38:28 -05:00
3dc48c1783 Lower default monitoring interval to 15s
Faults are also reported on the monitoring interval, so 60s seems like
too long. Lower this to 15 seconds by default instead.
2023-12-01 17:38:28 -05:00
9c2b1b29ee Add node health to fault states
Adjusts ordering and ensures that node health states are included in
faults if they are less than 50%.

Also adjusts fault ID generation and runs fault checks only coordinator
nodes to avoid too many runs.
2023-12-01 17:38:28 -05:00
8594eb697f Add initial fault generation in pvchealthd
References: #164
2023-12-01 17:38:27 -05:00
14 changed files with 1500 additions and 152 deletions

View File

@ -622,6 +622,190 @@ class API_Status(Resource):
api.add_resource(API_Status, "/status")
# /faults
class API_Faults(Resource):
@RequestParser(
[
{
"name": "sort_key",
"choices": (
"first_reported",
"last_reported",
"acknowledged_at",
"status",
"health_delta",
"message",
),
"helptext": "A valid sort key must be specified",
"required": False,
},
]
)
@Authenticator
def get(self, reqargs):
"""
Return a list of cluster faults
---
tags:
- faults
parameters:
- in: query
name: sort_key
type: string
required: false
description: The fault object key to sort results by
enum:
- first_reported
- last_reported
- acknowledged_at
- status
- health_delta
- message
responses:
200:
description: OK
schema:
type: array
items:
type: object
id: fault
properties:
id:
type: string
description: The ID of the fault
example: "10ae144b78b4cc5fdf09e2ebbac51235"
first_reported:
type: date
description: The first time the fault was reported
example: "2023-12-01 16:47:59.849742"
last_reported:
type: date
description: The last time the fault was reported
example: "2023-12-01 17:39:45.188398"
acknowledged_at:
type: date
description: The time the fault was acknowledged, or empty if not acknowledged
example: "2023-12-01 17:50:00.000000"
status:
type: string
description: The current state of the fault, either "new" or "ack" (acknowledged)
example: "new"
health_delta:
type: integer
description: The health delta (amount it reduces cluster health from 100%) of the fault
example: 25
message:
type: string
description: The textual description of the fault
example: "Node hv1 was at 40% (psur@-10%, psql@-50%) <= 50% health"
"""
return api_helper.fault_list(sort_key=reqargs.get("sort_key", "last_reported"))
@Authenticator
def put(self):
"""
Acknowledge all cluster faults
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_acknowledge_all()
@Authenticator
def delete(self):
"""
Delete all cluster faults
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_delete_all()
api.add_resource(API_Faults, "/faults")
# /faults/<fault_id>
class API_Faults_Element(Resource):
@Authenticator
def get(self, fault_id):
"""
Return a single cluster fault
---
tags:
- faults
responses:
200:
description: OK
schema:
type: array
items:
type: object
id: fault
$ref: '#/definitions/fault'
"""
return api_helper.fault_list(limit=fault_id)
@Authenticator
def put(self, fault_id):
"""
Acknowledge a cluster fault
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_acknowledge(fault_id)
@Authenticator
def delete(self, fault_id):
"""
Delete a cluster fault
---
tags:
- faults
responses:
200:
description: OK
schema:
type: object
properties:
message:
type: string
description: A text message
"""
return api_helper.fault_delete(fault_id)
api.add_resource(API_Faults_Element, "/faults/<fault_id>")
# /tasks
class API_Tasks(Resource):
@Authenticator

View File

@ -31,6 +31,7 @@ from daemon_lib.zkhandler import ZKConnection
import daemon_lib.common as pvc_common
import daemon_lib.cluster as pvc_cluster
import daemon_lib.faults as pvc_faults
import daemon_lib.node as pvc_node
import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network
@ -118,6 +119,101 @@ def cluster_maintenance(zkhandler, maint_state="false"):
return retdata, retcode
#
# Fault functions
#
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_list(zkhandler, limit=None, sort_key="last_reported"):
"""
Return a list of all faults sorted by SORT_KEY.
"""
retflag, retdata = pvc_faults.get_list(zkhandler, limit=limit, sort_key=sort_key)
if retflag:
retcode = 200
elif retflag and limit is not None and len(retdata) < 1:
retcode = 404
retdata = {"message": f"No fault with ID {limit} found"}
else:
retcode = 400
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_acknowledge(zkhandler, fault_id):
"""
Acknowledge a fault of FAULT_ID.
"""
retflag, retdata = pvc_faults.acknowledge(zkhandler, fault_id=fault_id)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_acknowledge_all(zkhandler):
"""
Acknowledge all faults.
"""
retflag, retdata = pvc_faults.acknowledge(zkhandler)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_delete(zkhandler, fault_id):
"""
Delete a fault of FAULT_ID.
"""
retflag, retdata = pvc_faults.delete(zkhandler, fault_id=fault_id)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def fault_delete_all(zkhandler):
"""
Delete all faults.
"""
retflag, retdata = pvc_faults.delete(zkhandler)
if retflag:
retcode = 200
else:
retcode = 404
retdata = {"message": retdata}
return retdata, retcode
#
# Node functions
#

View File

@ -37,6 +37,7 @@ from pvc.cli.parsers import *
from pvc.cli.formatters import *
import pvc.lib.cluster
import pvc.lib.faults
import pvc.lib.node
import pvc.lib.vm
import pvc.lib.network
@ -347,40 +348,6 @@ def cli_cluster():
pass
###############################################################################
# > pvc cluster status
###############################################################################
@click.command(
name="status",
short_help="Show cluster status.",
)
@connection_req
@format_opt(
{
"pretty": cli_cluster_status_format_pretty,
"short": cli_cluster_status_format_short,
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
}
)
def cli_cluster_status(
format_function,
):
"""
Show information and health about a PVC cluster.
\b
Format options:
"pretty": Output all details in a nice colourful format.
"short" Output only details about cluster health in a nice colourful format.
"json": Output in unformatted JSON.
"json-pretty": Output in formatted JSON.
"""
retcode, retdata = pvc.lib.cluster.get_info(CLI_CONFIG)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc cluster init
###############################################################################
@ -485,6 +452,157 @@ def cli_cluster_restore(
"""
###############################################################################
# > pvc cluster status
###############################################################################
@click.command(
name="status",
short_help="Show cluster status.",
)
@connection_req
@format_opt(
{
"pretty": cli_cluster_status_format_pretty,
"short": cli_cluster_status_format_short,
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
}
)
def cli_cluster_status(
format_function,
):
"""
Show information and health about a PVC cluster.
\b
Format options:
"pretty": Output all details in a nice colourful format.
"short" Output only details about cluster health in a nice colourful format.
"json": Output in unformatted JSON.
"json-pretty": Output in formatted JSON.
"""
retcode, retdata = pvc.lib.cluster.get_info(CLI_CONFIG)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc cluster fault
###############################################################################
@click.group(
name="fault",
short_help="Manage PVC cluster faults.",
context_settings=CONTEXT_SETTINGS,
)
def cli_cluster_fault():
"""
Manage faults in the PVC cluster.
"""
pass
###############################################################################
# > pvc cluster fault list
###############################################################################
@click.command(
name="list",
short_help="List all cluster faults.",
)
@click.argument("limit", default=None, required=False)
@format_opt(
{
"short": cli_cluster_fault_list_format_short,
"long": cli_cluster_fault_list_format_long,
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
},
default_format="short",
)
@connection_req
def cli_cluster_fault_list(limit, format_function):
"""
List all faults in the PVC cluster, optionally limited to fault ID LIMIT.
"""
retcode, retdata = pvc.lib.faults.get_list(
CLI_CONFIG,
limit=limit,
)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc cluster fault ack
###############################################################################
@click.command(
name="ack",
short_help="Acknowledge a cluster fault.",
)
@click.argument("fault_id")
@connection_req
def cli_cluster_fault_acknowledge(fault_id):
"""
Acknowledge the cluster fault FAULT_ID.
"""
retcode, retdata = pvc.lib.faults.acknowledge(CLI_CONFIG, fault_id)
finish(retcode, retdata)
###############################################################################
# > pvc cluster fault ack-all
###############################################################################
@click.command(
name="ack-all",
short_help="Acknowledge all cluster faults.",
)
@confirm_opt("Acknowledge all current cluster faults")
@connection_req
def cli_cluster_fault_acknowledge_all():
"""
Acknowledge all cluster faults.
"""
retcode, retdata = pvc.lib.faults.acknowledge_all(CLI_CONFIG)
finish(retcode, retdata)
###############################################################################
# > pvc cluster fault delete
###############################################################################
@click.command(
name="delete",
short_help="Delete a cluster fault.",
)
@click.argument("fault_id")
@connection_req
def cli_cluster_fault_delete(fault_id):
"""
Delete the cluster fault FAULT_ID.
"""
retcode, retdata = pvc.lib.faults.delete(CLI_CONFIG, fault_id)
finish(retcode, retdata)
###############################################################################
# > pvc cluster fault delete-all
###############################################################################
@click.command(
name="delete-all",
short_help="Delete all cluster faults.",
)
@confirm_opt("Delete all current cluster faults")
@connection_req
def cli_cluster_fault_delete_all():
"""
Delete all cluster faults.
"""
retcode, retdata = pvc.lib.faults.delete_all(CLI_CONFIG)
finish(retcode, retdata)
###############################################################################
# > pvc cluster maintenance
###############################################################################
@ -6170,10 +6288,16 @@ cli_provisioner_profile.add_command(cli_provisioner_profile_list)
cli_provisioner.add_command(cli_provisioner_profile)
cli_provisioner.add_command(cli_provisioner_create)
cli.add_command(cli_provisioner)
cli_cluster.add_command(cli_cluster_status)
cli_cluster.add_command(cli_cluster_init)
cli_cluster.add_command(cli_cluster_backup)
cli_cluster.add_command(cli_cluster_restore)
cli_cluster.add_command(cli_cluster_status)
cli_cluster_fault.add_command(cli_cluster_fault_list)
cli_cluster_fault.add_command(cli_cluster_fault_acknowledge)
cli_cluster_fault.add_command(cli_cluster_fault_acknowledge_all)
cli_cluster_fault.add_command(cli_cluster_fault_delete)
cli_cluster_fault.add_command(cli_cluster_fault_delete_all)
cli_cluster.add_command(cli_cluster_fault)
cli_cluster_maintenance.add_command(cli_cluster_maintenance_on)
cli_cluster_maintenance.add_command(cli_cluster_maintenance_off)
cli_cluster.add_command(cli_cluster_maintenance)

View File

@ -19,6 +19,7 @@
#
###############################################################################
from pvc.cli.helpers import MAX_CONTENT_WIDTH
from pvc.lib.node import format_info as node_format_info
from pvc.lib.node import format_list as node_format_list
from pvc.lib.vm import format_vm_tags as vm_format_tags
@ -96,6 +97,11 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
output.append(f"{ansii['bold']}PVC cluster status:{ansii['end']}")
output.append("")
output.append(f"{ansii['purple']}Primary node:{ansii['end']} {primary_node}")
output.append(f"{ansii['purple']}PVC version:{ansii['end']} {pvc_version}")
output.append(f"{ansii['purple']}Upstream IP:{ansii['end']} {upstream_ip}")
output.append("")
if health != "-1":
health = f"{health}%"
else:
@ -105,18 +111,33 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
health = f"{health} (maintenance on)"
output.append(
f"{ansii['purple']}Cluster health:{ansii['end']} {health_colour}{health}{ansii['end']}"
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
)
if messages is not None and len(messages) > 0:
messages = "\n ".join(sorted(messages))
output.append(f"{ansii['purple']}Health messages:{ansii['end']} {messages}")
message_list = list()
for message in messages:
if message["health_delta"] >= 50:
message_colour = ansii["red"]
elif message["health_delta"] >= 10:
message_colour = ansii["yellow"]
else:
message_colour = ansii["green"]
message_delta = (
f"({message_colour}-{message['health_delta']}%{ansii['end']})"
)
message_list.append(
# 15 length due to ANSI colour sequences
"{id} {delta:<15} {text}".format(
id=message["id"],
delta=message_delta,
text=message["text"],
)
)
output.append("")
messages = "\n ".join(message_list)
output.append(f"{ansii['purple']}Active Faults:{ansii['end']} {messages}")
output.append(f"{ansii['purple']}Primary node:{ansii['end']} {primary_node}")
output.append(f"{ansii['purple']}PVC version:{ansii['end']} {pvc_version}")
output.append(f"{ansii['purple']}Upstream IP:{ansii['end']} {upstream_ip}")
output.append("")
node_states = ["run,ready"]
@ -145,7 +166,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
nodes_string = ", ".join(nodes_strings)
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
output.append(f"{ansii['purple']}Nodes:{ansii['end']} {nodes_string}")
vm_states = ["start", "disable"]
vm_states.extend(
@ -175,7 +196,7 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
vms_string = ", ".join(vms_strings)
output.append(f"{ansii['purple']}VMs:{ansii['end']} {vms_string}")
output.append(f"{ansii['purple']}VMs:{ansii['end']} {vms_string}")
osd_states = ["up,in"]
osd_states.extend(
@ -201,15 +222,15 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
osds_string = " ".join(osds_strings)
output.append(f"{ansii['purple']}OSDs:{ansii['end']} {osds_string}")
output.append(f"{ansii['purple']}OSDs:{ansii['end']} {osds_string}")
output.append(f"{ansii['purple']}Pools:{ansii['end']} {total_pools}")
output.append(f"{ansii['purple']}Pools:{ansii['end']} {total_pools}")
output.append(f"{ansii['purple']}Volumes:{ansii['end']} {total_volumes}")
output.append(f"{ansii['purple']}Volumes:{ansii['end']} {total_volumes}")
output.append(f"{ansii['purple']}Snapshots:{ansii['end']} {total_snapshots}")
output.append(f"{ansii['purple']}Snapshots:{ansii['end']} {total_snapshots}")
output.append(f"{ansii['purple']}Networks:{ansii['end']} {total_networks}")
output.append(f"{ansii['purple']}Networks:{ansii['end']} {total_networks}")
output.append("")
@ -249,18 +270,332 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
health = f"{health} (maintenance on)"
output.append(
f"{ansii['purple']}Cluster health:{ansii['end']} {health_colour}{health}{ansii['end']}"
f"{ansii['purple']}Health:{ansii['end']} {health_colour}{health}{ansii['end']}"
)
if messages is not None and len(messages) > 0:
messages = "\n ".join(sorted(messages))
output.append(f"{ansii['purple']}Health messages:{ansii['end']} {messages}")
message_list = list()
for message in messages:
if message["health_delta"] >= 50:
message_colour = ansii["red"]
elif message["health_delta"] >= 10:
message_colour = ansii["yellow"]
else:
message_colour = ansii["green"]
message_delta = (
f"({message_colour}-{message['health_delta']}%{ansii['end']})"
)
message_list.append(
# 15 length due to ANSI colour sequences
"{id} {delta:<15} {text}".format(
id=message["id"],
delta=message_delta,
text=message["text"],
)
)
messages = "\n ".join(message_list)
output.append(f"{ansii['purple']}Active Faults:{ansii['end']} {messages}")
output.append("")
return "\n".join(output)
def cli_cluster_fault_list_format_short(CLI_CONFIG, fault_data):
"""
Short pretty format the output of cli_cluster_fault_list
"""
fault_list_output = []
# Determine optimal column widths
fault_id_length = 3 # "ID"
fault_status_length = 7 # "Status"
fault_last_reported_length = 14 # "Last Reported"
fault_health_delta_length = 7 # "Health"
fault_message_length = 8 # "Message"
for fault in fault_data:
# fault_id column
_fault_id_length = len(str(fault["id"])) + 1
if _fault_id_length > fault_id_length:
fault_id_length = _fault_id_length
# status column
_fault_status_length = len(str(fault["status"])) + 1
if _fault_status_length > fault_status_length:
fault_status_length = _fault_status_length
# health_delta column
_fault_health_delta_length = len(str(fault["health_delta"])) + 1
if _fault_health_delta_length > fault_health_delta_length:
fault_health_delta_length = _fault_health_delta_length
# last_reported column
_fault_last_reported_length = len(str(fault["last_reported"])) + 1
if _fault_last_reported_length > fault_last_reported_length:
fault_last_reported_length = _fault_last_reported_length
message_prefix_len = (
fault_id_length
+ 1
+ fault_status_length
+ 1
+ fault_health_delta_length
+ 1
+ fault_last_reported_length
+ 1
)
message_length = MAX_CONTENT_WIDTH - message_prefix_len
if fault_message_length > message_length:
fault_message_length = message_length + 1
# Handle splitting fault messages into separate lines based on width
formatted_messages = dict()
for fault in fault_data:
split_message = list()
if len(fault["message"]) > message_length:
words = fault["message"].split()
current_line = words[0]
for word in words[1:]:
if len(current_line) + len(word) + 1 < message_length:
current_line = f"{current_line} {word}"
else:
split_message.append(current_line)
current_line = word
split_message.append(current_line)
for line in split_message:
# message column
_fault_message_length = len(line) + 1
if _fault_message_length > fault_message_length:
fault_message_length = _fault_message_length
message = f"\n{' ' * message_prefix_len}".join(split_message)
else:
message = fault["message"]
# message column
_fault_message_length = len(message) + 1
if _fault_message_length > fault_message_length:
fault_message_length = _fault_message_length
formatted_messages[fault["id"]] = message
meta_header_length = (
fault_id_length + fault_status_length + fault_health_delta_length + 2
)
detail_header_length = (
fault_health_delta_length
+ fault_status_length
+ fault_last_reported_length
+ fault_message_length
+ 3
- meta_header_length
+ 8
)
# Format the string (header)
fault_list_output.append(
"{bold}Meta {meta_dashes} Fault {detail_dashes}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
meta_dashes="-" * (meta_header_length - len("Meta ")),
detail_dashes="-" * (detail_header_length - len("Fault ")),
)
)
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {fault_status: <{fault_status_length}} {fault_health_delta: <{fault_health_delta_length}} {fault_last_reported: <{fault_last_reported_length}} {fault_message}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_last_reported_length=fault_last_reported_length,
fault_id="ID",
fault_status="Status",
fault_health_delta="Health",
fault_last_reported="Last Reported",
fault_message="Message",
)
)
for fault in sorted(
fault_data,
key=lambda x: (x["health_delta"], x["last_reported"]),
reverse=True,
):
health_delta = fault["health_delta"]
if fault["acknowledged_at"] != "":
health_colour = ansii["blue"]
elif health_delta >= 50:
health_colour = ansii["red"]
elif health_delta >= 10:
health_colour = ansii["yellow"]
else:
health_colour = ansii["green"]
if len(fault["message"]) > message_length:
words = fault["message"].split()
split_message = list()
current_line = words[0]
for word in words:
if len(current_line) + len(word) + 1 < message_length:
current_line = f"{current_line} {word}"
else:
split_message.append(current_line)
current_line = word
split_message.append(current_line)
message = f"\n{' ' * message_prefix_len}".join(split_message)
else:
message = fault["message"]
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {fault_status: <{fault_status_length}} {health_colour}{fault_health_delta: <{fault_health_delta_length}}{end_colour} {fault_last_reported: <{fault_last_reported_length}} {fault_message}{end_bold}".format(
bold="",
end_bold="",
health_colour=health_colour,
end_colour=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_last_reported_length=fault_last_reported_length,
fault_id=fault["id"],
fault_status=fault["status"],
fault_health_delta=f"-{fault['health_delta']}%",
fault_last_reported=fault["last_reported"],
fault_message=formatted_messages[fault["id"]],
)
)
return "\n".join(fault_list_output)
def cli_cluster_fault_list_format_long(CLI_CONFIG, fault_data):
"""
Pretty format the output of cli_cluster_fault_list
"""
fault_list_output = []
# Determine optimal column widths
fault_id_length = 3 # "ID"
fault_status_length = 7 # "Status"
fault_health_delta_length = 7 # "Health"
fault_acknowledged_at_length = 9 # "Ack'd On"
fault_last_reported_length = 14 # "Last Reported"
fault_first_reported_length = 15 # "First Reported"
# Message goes on its own line
for fault in fault_data:
# fault_id column
_fault_id_length = len(str(fault["id"])) + 1
if _fault_id_length > fault_id_length:
fault_id_length = _fault_id_length
# status column
_fault_status_length = len(str(fault["status"])) + 1
if _fault_status_length > fault_status_length:
fault_status_length = _fault_status_length
# health_delta column
_fault_health_delta_length = len(str(fault["health_delta"])) + 1
if _fault_health_delta_length > fault_health_delta_length:
fault_health_delta_length = _fault_health_delta_length
# acknowledged_at column
_fault_acknowledged_at_length = len(str(fault["acknowledged_at"])) + 1
if _fault_acknowledged_at_length > fault_acknowledged_at_length:
fault_acknowledged_at_length = _fault_acknowledged_at_length
# last_reported column
_fault_last_reported_length = len(str(fault["last_reported"])) + 1
if _fault_last_reported_length > fault_last_reported_length:
fault_last_reported_length = _fault_last_reported_length
# first_reported column
_fault_first_reported_length = len(str(fault["first_reported"])) + 1
if _fault_first_reported_length > fault_first_reported_length:
fault_first_reported_length = _fault_first_reported_length
# Format the string (header)
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {fault_status: <{fault_status_length}} {fault_health_delta: <{fault_health_delta_length}} {fault_acknowledged_at: <{fault_acknowledged_at_length}} {fault_last_reported: <{fault_last_reported_length}} {fault_first_reported: <{fault_first_reported_length}}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_acknowledged_at_length=fault_acknowledged_at_length,
fault_last_reported_length=fault_last_reported_length,
fault_first_reported_length=fault_first_reported_length,
fault_id="ID",
fault_status="Status",
fault_health_delta="Health",
fault_acknowledged_at="Ack'd On",
fault_last_reported="Last Reported",
fault_first_reported="First Reported",
)
)
fault_list_output.append(
"{bold}> {fault_message}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
fault_message="Message",
)
)
for fault in sorted(
fault_data,
key=lambda x: (x["status"], x["health_delta"], x["last_reported"]),
reverse=True,
):
health_delta = fault["health_delta"]
if fault["acknowledged_at"] != "":
health_colour = ansii["blue"]
elif health_delta >= 50:
health_colour = ansii["red"]
elif health_delta >= 10:
health_colour = ansii["yellow"]
else:
health_colour = ansii["green"]
fault_list_output.append("")
fault_list_output.append(
"{bold}{fault_id: <{fault_id_length}} {health_colour}{fault_status: <{fault_status_length}} {fault_health_delta: <{fault_health_delta_length}}{end_colour} {fault_acknowledged_at: <{fault_acknowledged_at_length}} {fault_last_reported: <{fault_last_reported_length}} {fault_first_reported: <{fault_first_reported_length}}{end_bold}".format(
bold="",
end_bold="",
health_colour=health_colour,
end_colour=ansii["end"],
fault_id_length=fault_id_length,
fault_status_length=fault_status_length,
fault_health_delta_length=fault_health_delta_length,
fault_acknowledged_at_length=fault_acknowledged_at_length,
fault_last_reported_length=fault_last_reported_length,
fault_first_reported_length=fault_first_reported_length,
fault_id=fault["id"],
fault_status=fault["status"].title(),
fault_health_delta=f"-{fault['health_delta']}%",
fault_acknowledged_at=fault["acknowledged_at"]
if fault["acknowledged_at"] != ""
else "N/A",
fault_last_reported=fault["last_reported"],
fault_first_reported=fault["first_reported"],
)
)
fault_list_output.append(
"> {fault_message}".format(
fault_message=fault["message"],
)
)
return "\n".join(fault_list_output)
def cli_cluster_task_format_pretty(CLI_CONFIG, task_data):
"""
Pretty format the output of cli_cluster_task

View File

@ -26,7 +26,7 @@ from distutils.util import strtobool
from getpass import getuser
from json import load as jload
from json import dump as jdump
from os import chmod, environ, getpid, path, makedirs
from os import chmod, environ, getpid, path, makedirs, get_terminal_size
from re import findall
from socket import gethostname
from subprocess import run, PIPE
@ -45,7 +45,13 @@ DEFAULT_STORE_FILENAME = "pvc.json"
DEFAULT_API_PREFIX = "/api/v1"
DEFAULT_NODE_HOSTNAME = gethostname().split(".")[0]
DEFAULT_AUTOBACKUP_FILENAME = "/etc/pvc/pvc.conf"
MAX_CONTENT_WIDTH = 120
try:
# Define the content width to be the maximum terminal size
MAX_CONTENT_WIDTH = get_terminal_size().columns - 1
except OSError:
# Fall back to 80 columns if "Inappropriate ioctl for device"
MAX_CONTENT_WIDTH = 80
def echo(config, message, newline=True, stderr=False):

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python3
# faults.py - PVC CLI client function library, faults management
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from pvc.lib.common import call_api
def get_list(config, limit=None, sort_key="last_reported"):
"""
Get list of PVC faults
API endpoint: GET /api/v1/faults
API arguments: sort_key={sort_key}
API schema: {json_data_object}
"""
if limit is not None:
params = {}
endpoint = f"/faults/{limit}"
else:
params = {"sort_key": sort_key}
endpoint = "/faults"
response = call_api(config, "get", endpoint, params=params)
if response.status_code == 200:
return True, response.json()
else:
return False, response.json().get("message", "")
def acknowledge(config, fault_id):
"""
Acknowledge a PVC fault
API endpoint: PUT /api/v1/faults/<fault_id>
API arguments:
API schema: {json_message}
"""
response = call_api(config, "put", f"/faults/{fault_id}")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")
def acknowledge_all(config):
"""
Acknowledge all PVC faults
API endpoint: PUT /api/v1/faults
API arguments:
API schema: {json_message}
"""
response = call_api(config, "put", "/faults")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")
def delete(config, fault_id):
"""
Delete a PVC fault
API endpoint: DELETE /api/v1/faults/<fault_id>
API arguments:
API schema: {json_message}
"""
response = call_api(config, "delete", f"/faults/{fault_id}")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")
def delete_all(config):
"""
Delete all PVC faults
API endpoint: DELETE /api/v1/faults
API arguments:
API schema: {json_message}
"""
response = call_api(config, "delete", "/faults")
if response.status_code == 200:
return True, response.json().get("message", "")
else:
return False, response.json().get("message", "")

View File

@ -22,6 +22,7 @@
from json import loads
import daemon_lib.common as common
import daemon_lib.faults as faults
import daemon_lib.vm as pvc_vm
import daemon_lib.node as pvc_node
import daemon_lib.network as pvc_network
@ -44,6 +45,39 @@ def set_maintenance(zkhandler, maint_state):
return True, "Successfully set cluster in normal mode"
def getClusterHealthFromFaults(zkhandler):
faults_list = faults.getAllFaults(zkhandler)
unacknowledged_faults = [fault for fault in faults_list if fault["status"] != "ack"]
# Generate total cluster health numbers
cluster_health_value = 100
cluster_health_messages = list()
for fault in sorted(
unacknowledged_faults,
key=lambda x: (x["health_delta"], x["last_reported"]),
reverse=True,
):
cluster_health_value -= fault["health_delta"]
message = {
"id": fault["id"],
"health_delta": fault["health_delta"],
"text": fault["message"],
}
cluster_health_messages.append(message)
if cluster_health_value < 0:
cluster_health_value = 0
cluster_health = {
"health": cluster_health_value,
"messages": cluster_health_messages,
}
return cluster_health
def getClusterHealth(zkhandler, node_list, vm_list, ceph_osd_list):
health_delta_map = {
"node_stopped": 50,
@ -318,9 +352,7 @@ def getClusterInformation(zkhandler):
# Format the status data
cluster_information = {
"cluster_health": getClusterHealth(
zkhandler, node_list, vm_list, ceph_osd_list
),
"cluster_health": getClusterHealthFromFaults(zkhandler),
"node_health": getNodeHealth(zkhandler, node_list),
"maintenance": maintenance_state,
"primary_node": primary_node,

View File

@ -284,7 +284,7 @@ def get_parsed_configuration(config_file):
config_timer = {
"vm_shutdown_timeout": int(o_timer.get("vm_shutdown_timeout", 180)),
"keepalive_interval": int(o_timer.get("keepalive_interval", 5)),
"monitoring_interval": int(o_timer.get("monitoring_interval", 60)),
"monitoring_interval": int(o_timer.get("monitoring_interval", 15)),
}
config = {**config, **config_timer}

211
daemon-common/faults.py Normal file
View File

@ -0,0 +1,211 @@
#!/usr/bin/env python3
# faults.py - PVC client function library, faults management
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from datetime import datetime
from hashlib import md5
def generate_fault(
zkhandler, logger, fault_name, fault_time, fault_delta, fault_message
):
# Generate a fault ID from the fault_message and fault_delta
fault_str = f"{fault_name} {fault_delta} {fault_message}"
fault_id = str(md5(fault_str.encode("utf-8")).hexdigest())[:8]
# Strip the microseconds off of the fault time; we don't care about that precision
fault_time = str(fault_time).split(".")[0]
# If a fault already exists with this ID, just update the time
if not zkhandler.exists("base.faults"):
logger.out(
f"Skipping fault reporting for {fault_id} due to missing Zookeeper schemas",
state="w",
)
return
existing_faults = zkhandler.children("base.faults")
if fault_id in existing_faults:
logger.out(
f"Updating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
)
else:
logger.out(
f"Generating fault {fault_id}: {fault_message} @ {fault_time}",
state="i",
)
if zkhandler.read("base.config.maintenance") == "true":
logger.out(
f"Skipping fault reporting for {fault_id} due to maintenance mode",
state="w",
)
return
if fault_id in existing_faults:
zkhandler.write(
[
(("faults.last_time", fault_id), fault_time),
]
)
# Otherwise, generate a new fault event
else:
zkhandler.write(
[
(("faults.id", fault_id), ""),
(("faults.first_time", fault_id), fault_time),
(("faults.last_time", fault_id), fault_time),
(("faults.ack_time", fault_id), ""),
(("faults.status", fault_id), "new"),
(("faults.delta", fault_id), fault_delta),
(("faults.message", fault_id), fault_message),
]
)
def getFault(zkhandler, fault_id):
"""
Get the details of a fault based on the fault ID
"""
if not zkhandler.exists(("faults.id", fault_id)):
return None
fault_id = fault_id
fault_last_time = zkhandler.read(("faults.last_time", fault_id))
fault_first_time = zkhandler.read(("faults.first_time", fault_id))
fault_ack_time = zkhandler.read(("faults.ack_time", fault_id))
fault_status = zkhandler.read(("faults.status", fault_id))
fault_delta = int(zkhandler.read(("faults.delta", fault_id)))
fault_message = zkhandler.read(("faults.message", fault_id))
# Acknowledged faults have a delta of 0
if fault_ack_time != "":
fault_delta = 0
fault = {
"id": fault_id,
"last_reported": fault_last_time,
"first_reported": fault_first_time,
"acknowledged_at": fault_ack_time,
"status": fault_status,
"health_delta": fault_delta,
"message": fault_message,
}
return fault
def getAllFaults(zkhandler, sort_key="last_reported"):
"""
Get the details of all registered faults
"""
all_faults = zkhandler.children(("base.faults"))
faults_detail = list()
for fault_id in all_faults:
fault_detail = getFault(zkhandler, fault_id)
faults_detail.append(fault_detail)
sorted_faults = sorted(faults_detail, key=lambda x: x[sort_key])
# Sort newest-first for time-based sorts
if sort_key in ["first_reported", "last_reported", "acknowledge_at"]:
sorted_faults.reverse()
return sorted_faults
def get_list(zkhandler, limit=None, sort_key="last_reported"):
"""
Get a list of all known faults, sorted by {sort_key}
"""
if sort_key not in [
"first_reported",
"last_reported",
"acknowledged_at",
"status",
"health_delta",
"message",
]:
return False, f"Invalid sort key {sort_key} provided"
all_faults = getAllFaults(zkhandler, sort_key=sort_key)
if limit is not None:
all_faults = [fault for fault in all_faults if fault["id"] == limit]
return True, all_faults
def acknowledge(zkhandler, fault_id=None):
"""
Acknowledge a fault or all faults
"""
if fault_id is None:
faults = getAllFaults(zkhandler)
else:
fault = getFault(zkhandler, fault_id)
if fault is None:
return False, f"No fault with ID {fault_id} found"
faults = [fault]
for fault in faults:
# Don't reacknowledge already-acknowledged faults
if fault["status"] != "ack":
zkhandler.write(
[
(
("faults.ack_time", fault["id"]),
str(datetime.now()).split(".")[0],
),
(("faults.status", fault["id"]), "ack"),
]
)
return (
True,
f"Successfully acknowledged fault(s) {', '.join([fault['id'] for fault in faults])}",
)
def delete(zkhandler, fault_id=None):
"""
Delete a fault or all faults
"""
if fault_id is None:
faults = getAllFaults(zkhandler)
else:
fault = getFault(zkhandler, fault_id)
if fault is None:
return False, f"No fault with ID {fault_id} found"
faults = [fault]
for fault in faults:
zkhandler.delete(("faults.id", fault["id"]), recursive=True)
return (
True,
f"Successfully deleted fault(s) {', '.join([fault['id'] for fault in faults])}",
)

View File

@ -0,0 +1 @@
{"version": "11", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "logs": "/logs", "faults": "/faults", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.health": "/ceph/health", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "faults": {"id": "", "last_time": "/last_time", "first_time": "/first_time", "ack_time": "/ack_time", "status": "/status", "delta": "/delta", "message": "/message"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", "monitoring.plugins": "/monitoring_plugins", "monitoring.data": "/monitoring_data", "monitoring.health": "/monitoring_health"}, "monitoring_plugin": {"name": "", "last_run": "/last_run", "health_delta": "/health_delta", "message": "/message", "data": "/data", "runtime": "/runtime"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "is_split": "/is_split", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -335,11 +335,11 @@ class ZKHandler(object):
try:
path = self.get_schema_path(key)
if path is None:
# This path is invalid; this is likely due to missing schema entries, so return None
return None
raise NoNodeError
return self.zk_conn.get_children(path)
except NoNodeError:
# This path is invalid; this is likely due to missing schema entries, so return None
return None
def rename(self, kkpairs):
@ -540,7 +540,7 @@ class ZKHandler(object):
#
class ZKSchema(object):
# Current version
_version = 10
_version = 11
# Root for doing nested keys
_schema_root = ""
@ -560,7 +560,8 @@ class ZKSchema(object):
"config.primary_node.sync_lock": f"{_schema_root}/config/primary_node/sync_lock",
"config.upstream_ip": f"{_schema_root}/config/upstream_ip",
"config.migration_target_selector": f"{_schema_root}/config/migration_target_selector",
"logs": "/logs",
"logs": f"{_schema_root}/logs",
"faults": f"{_schema_root}/faults",
"node": f"{_schema_root}/nodes",
"domain": f"{_schema_root}/domains",
"network": f"{_schema_root}/networks",
@ -577,6 +578,16 @@ class ZKSchema(object):
"node": "", # The root key
"messages": "/messages",
},
# The schema of an individual logs entry (/logs/{id})
"faults": {
"id": "", # The root key
"last_time": "/last_time",
"first_time": "/first_time",
"ack_time": "/ack_time",
"status": "/status",
"delta": "/delta",
"message": "/message",
},
# The schema of an individual node entry (/nodes/{node_name})
"node": {
"name": "", # The root key
@ -619,7 +630,11 @@ class ZKSchema(object):
"runtime": "/runtime",
},
# The schema of an individual SR-IOV PF entry (/nodes/{node_name}/sriov/pf/{pf})
"sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, # The root key
"sriov_pf": {
"phy": "",
"mtu": "/mtu",
"vfcount": "/vfcount",
}, # The root key
# The schema of an individual SR-IOV VF entry (/nodes/{node_name}/sriov/vf/{vf})
"sriov_vf": {
"phy": "", # The root key
@ -665,7 +680,11 @@ class ZKSchema(object):
"migrate.sync_lock": "/migrate_sync_lock",
},
# The schema of an individual domain tag entry (/domains/{domain}/tags/{tag})
"tag": {"name": "", "type": "/type", "protected": "/protected"}, # The root key
"tag": {
"name": "",
"type": "/type",
"protected": "/protected",
}, # The root key
# The schema of an individual network entry (/networks/{vni})
"network": {
"vni": "", # The root key
@ -702,7 +721,11 @@ class ZKSchema(object):
"client_id": "/clientid",
},
# The schema for an individual network ACL entry (/networks/{vni}/firewall_rules/(in|out)/{acl}
"rule": {"description": "", "rule": "/rule", "order": "/order"}, # The root key
"rule": {
"description": "",
"rule": "/rule",
"order": "/order",
}, # The root key
# The schema of an individual OSD entry (/ceph/osds/{osd_id})
"osd": {
"id": "", # The root key
@ -726,9 +749,15 @@ class ZKSchema(object):
"stats": "/stats",
}, # The root key
# The schema of an individual volume entry (/ceph/volumes/{pool_name}/{volume_name})
"volume": {"name": "", "stats": "/stats"}, # The root key
"volume": {
"name": "",
"stats": "/stats",
}, # The root key
# The schema of an individual snapshot entry (/ceph/volumes/{pool_name}/{volume_name}/{snapshot_name})
"snapshot": {"name": "", "stats": "/stats"}, # The root key
"snapshot": {
"name": "",
"stats": "/stats",
}, # The root key
}
# Properties

View File

@ -55,7 +55,8 @@ class MonitoringPluginScript(MonitoringPlugin):
This step is optional and should be used sparingly.
"""
pass
# Prepare the last coordinator state
self.last_coordinator_state = None
def run(self, coordinator_state=None):
"""
@ -66,6 +67,8 @@ class MonitoringPluginScript(MonitoringPlugin):
# Run any imports first
from psycopg2 import connect
from json import loads as jloads
from daemon_lib.common import run_os_command
conn_api = None
cur_api = None
@ -77,7 +80,7 @@ class MonitoringPluginScript(MonitoringPlugin):
# Craft a message that can be used by the clients
message = "Successfully connected to PostgreSQL databases on localhost"
# Check the Metadata database (primary)
# Check the API database
try:
conn_api = connect(
host=self.this_node.name,
@ -99,34 +102,38 @@ class MonitoringPluginScript(MonitoringPlugin):
if conn_api is not None:
conn_api.close()
if health_delta == 0:
# Check the PowerDNS database (secondary)
try:
conn_pdns = connect(
host=self.this_node.name,
port=self.config["pdns_postgresql_port"],
dbname=self.config["pdns_postgresql_dbname"],
user=self.config["pdns_postgresql_user"],
password=self.config["pdns_postgresql_password"],
)
cur_pdns = conn_pdns.cursor()
cur_pdns.execute("""SELECT * FROM supermasters""")
data = cur_pdns.fetchone()
except Exception as e:
health_delta = 50
err = str(e).split('\n')[0]
message = f"Failed to connect to PostgreSQL database {self.config['pdns_postgresql_dbname']}: {err}"
finally:
if cur_pdns is not None:
cur_pdns.close()
if conn_pdns is not None:
conn_pdns.close()
# Check for Patroni status
_, stdout, _ = run_os_command("patronictl --config-file /etc/patroni/config.yml list --format json")
patronictl_status = jloads(stdout)
this_node_patronictl_status = [p for p in patronictl_status if p["Member"] == self.this_node.name][0]
self.logger.out(f"{this_node_patronictl_status}, last node state: {self.last_coordinator_state}, current node state: {coordinator_state}", state="d")
# Invalid state, nothing returned; this is a fault
if health_delta == 0 and not this_node_patronictl_status:
health_delta = 10
message = "Unable to determine Patroni PostgreSQL node state"
# We want to check for a non-running Patroni, but not during or immediately after a coordinator
# transition. So we wait until 2 runs with the same coordinator state have been completed.
elif health_delta == 0 and self.last_coordinator_state == coordinator_state and this_node_patronictl_status["State"] != "running":
health_delta = 10
message = "Patroni PostgreSQL state is not running"
# Handle some exceptional cases
if health_delta > 0:
if coordinator_state in ["takeover", "relinquish"]:
# This scenario occurrs if this plugin run catches a node transitioning from primary to
# secondary coordinator. We can ignore it.
health_delta = 0
message = "Patroni PostgreSQL error reported but currently transitioning coordinator state; ignoring."
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(message)
# Update the last coordinator state
self.last_coordinator_state = coordinator_state
# Return our local PluginResult object
return self.plugin_result

View File

@ -25,9 +25,11 @@ import importlib.util
from os import walk
from datetime import datetime
from json import dumps
from json import dumps, loads
from apscheduler.schedulers.background import BackgroundScheduler
from daemon_lib.faults import generate_fault
class PluginError(Exception):
"""
@ -196,6 +198,125 @@ class MonitoringInstance(object):
self.config = config
self.logger = logger
self.this_node = this_node
self.faults = 0
# Create functions for each fault type
def get_node_daemon_states():
node_daemon_states = [
{
"entry": node,
"check": self.zkhandler.read(("node.state.daemon", node)),
"details": "",
}
for node in self.zkhandler.children("base.node")
]
return node_daemon_states
def get_osd_in_states():
osd_in_states = [
{
"entry": osd,
"check": loads(self.zkhandler.read(("osd.stats", osd))).get(
"in", 0
),
"details": "",
}
for osd in self.zkhandler.children("base.osd")
]
return osd_in_states
def get_ceph_health_entries():
ceph_health_entries = [
{
"entry": f"{value['severity']} {key}",
"check": value["severity"],
"details": value["summary"]["message"],
}
for key, value in loads(zkhandler.read("base.storage.health"))[
"checks"
].items()
]
return ceph_health_entries
def get_vm_states():
vm_states = [
{
"entry": self.zkhandler.read(("domain.name", domain)),
"check": self.zkhandler.read(("domain.state", domain)),
"details": self.zkhandler.read(("domain.failed_reason", domain)),
}
for domain in self.zkhandler.children("base.domain")
]
return vm_states
def get_overprovisioned_memory():
all_nodes = self.zkhandler.children("base.node")
current_memory_provisioned = sum(
[
int(self.zkhandler.read(("node.memory.allocated", node)))
for node in all_nodes
]
)
node_memory_totals = [
int(self.zkhandler.read(("node.memory.total", node)))
for node in all_nodes
]
total_node_memory = sum(node_memory_totals)
most_node_memory = sorted(node_memory_totals)[-1]
available_node_memory = total_node_memory - most_node_memory
if current_memory_provisioned >= available_node_memory:
op_str = "overprovisioned"
else:
op_str = "ok"
overprovisioned_memory = [
{
"entry": f"{current_memory_provisioned}MB > {available_node_memory}MB (N-1)",
"check": op_str,
"details": "",
}
]
return overprovisioned_memory
# This is a list of all possible faults (cluster error messages) and their corresponding details
self.cluster_faults_map = {
"dead_or_fenced_node": {
"entries": get_node_daemon_states,
"conditions": ["dead", "fenced"],
"delta": 50,
"message": "Node {entry} was dead and/or fenced",
},
"ceph_osd_out": {
"entries": get_osd_in_states,
"conditions": ["0"],
"delta": 50,
"message": "OSD {entry} was marked out",
},
"ceph_warn": {
"entries": get_ceph_health_entries,
"conditions": ["HEALTH_WARN"],
"delta": 10,
"message": "{entry} reported by Ceph ({details})",
},
"ceph_err": {
"entries": get_ceph_health_entries,
"conditions": ["HEALTH_ERR"],
"delta": 50,
"message": "{entry} reported by Ceph ({details})",
},
"vm_failed": {
"entries": get_vm_states,
"conditions": ["fail"],
"delta": 10,
"message": "VM {entry} was failed ({details})",
},
"memory_overprovisioned": {
"entries": get_overprovisioned_memory,
"conditions": ["overprovisioned"],
"delta": 50,
"message": "Cluster memory was overprovisioned {entry}",
},
}
# Get a list of plugins from the plugin_directory
plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[
@ -344,38 +465,85 @@ class MonitoringInstance(object):
)
)
self.run_plugins()
self.start_check_timer()
self.start_timer()
def __del__(self):
self.shutdown()
def shutdown(self):
self.stop_check_timer()
self.stop_timer()
self.run_cleanups()
return
def start_check_timer(self):
check_interval = self.config["monitoring_interval"]
def start_timer(self):
check_interval = int(self.config["monitoring_interval"])
self.timer = BackgroundScheduler()
self.timer.add_job(
self.run_checks,
trigger="interval",
seconds=check_interval,
)
self.logger.out(
f"Starting monitoring check timer ({check_interval} second interval)",
state="s",
)
self.check_timer = BackgroundScheduler()
self.check_timer.add_job(
self.run_plugins,
trigger="interval",
seconds=check_interval,
)
self.check_timer.start()
self.timer.start()
def stop_check_timer(self):
self.run_checks()
def stop_timer(self):
try:
self.check_timer.shutdown()
self.logger.out("Stopping monitoring check timer", state="s")
self.timer.shutdown()
except Exception:
self.logger.out("Failed to stop monitoring check timer", state="w")
def run_faults(self, coordinator_state=None):
self.logger.out(
f"Starting cluster fault check run at {datetime.now()}",
state="t",
)
for fault_type in self.cluster_faults_map.keys():
fault_details = self.cluster_faults_map[fault_type]
if self.config["log_monitoring_details"] or self.config["debug"]:
self.logger.out(
f"Running fault check {fault_type}",
state="t",
)
entries = fault_details["entries"]()
if self.config["debug"]:
self.logger.out(
f"Entries for fault check {fault_type}: {dumps(entries)}",
state="d",
)
for _entry in entries:
entry = _entry["entry"]
check = _entry["check"]
details = _entry["details"]
for condition in fault_details["conditions"]:
if str(condition) == str(check):
fault_time = datetime.now()
fault_delta = fault_details["delta"]
fault_message = fault_details["message"].format(
entry=entry, details=details
)
generate_fault(
self.zkhandler,
self.logger,
fault_type,
fault_time,
fault_delta,
fault_message,
)
self.faults += 1
def run_plugin(self, plugin):
time_start = datetime.now()
try:
@ -394,19 +562,9 @@ class MonitoringInstance(object):
result.to_zookeeper()
return result
def run_plugins(self):
if self.this_node.coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif self.this_node.coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
active_coordinator_state = self.this_node.coordinator_state
runtime_start = datetime.now()
def run_plugins(self, coordinator_state=None):
self.logger.out(
"Starting monitoring healthcheck run",
f"Starting node plugin check run at {datetime.now()}",
state="t",
)
@ -427,7 +585,32 @@ class MonitoringInstance(object):
state="t",
prefix=f"{result.plugin_name} ({result.runtime}s)",
)
total_health -= result.health_delta
# Generate a cluster fault if the plugin is in a suboptimal state
if result.health_delta > 0:
fault_type = f"plugin.{self.this_node.name}.{result.plugin_name}"
fault_time = datetime.now()
# Map our check results to fault results
# These are not 1-to-1, as faults are cluster-wide.
# We divide the delta by two since 2 nodes with the same problem
# should equal what the result says.
fault_delta = int(result.health_delta / 2)
fault_message = (
f"{self.this_node.name} {result.plugin_name}: {result.message}"
)
generate_fault(
self.zkhandler,
self.logger,
fault_type,
fault_time,
fault_delta,
fault_message,
)
self.faults += 1
total_health -= result.health_delta
if total_health < 0:
total_health = 0
@ -441,38 +624,6 @@ class MonitoringInstance(object):
]
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
time.sleep(0.2)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = str(self.this_node.health) + "%"
else:
health_colour = self.logger.fmt_blue
health_text = "N/A"
self.logger.out(
"{start_colour}{hostname} healthcheck @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
health_colour=health_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
health=health_text,
runtime=runtime,
),
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()
@ -494,3 +645,66 @@ class MonitoringInstance(object):
),
]
)
def run_checks(self):
self.faults = 0
runtime_start = datetime.now()
coordinator_state = self.this_node.coordinator_state
if coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
self.run_plugins(coordinator_state=coordinator_state)
if coordinator_state in ["primary", "secondary", "takeover", "relinquish"]:
self.run_faults(coordinator_state=coordinator_state)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
result_text = list()
if coordinator_state in ["primary", "secondary", "takeover", "relinquish"]:
if self.faults > 0:
fault_colour = self.logger.fmt_red
else:
fault_colour = self.logger.fmt_green
if self.faults != 1:
s = "s"
else:
s = ""
fault_text = f"{fault_colour}{self.faults}{self.logger.fmt_end} fault{s}"
result_text.append(fault_text)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = f"{health_colour}{self.this_node.health}%{self.logger.fmt_end} node health"
result_text.append(health_text)
else:
health_text = "{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
result_text.append(health_text)
self.logger.out(
"{start_colour}{hostname} health check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {result_text} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=coordinator_state,
runtime=runtime,
result_text=", ".join(result_text),
),
state="t",
)

View File

@ -216,7 +216,7 @@ timer:
keepalive_interval: 5
# Monitoring interval (seconds)
monitoring_interval: 60
monitoring_interval: 15
# Fencing configuration
fencing: