Use coordinator_state instead of router_state
Makes it much clearer what this variable represents.
This commit is contained in:
parent
40b7d68853
commit
254303b9d4
|
@ -235,12 +235,12 @@ def entrypoint():
|
||||||
|
|
||||||
# Force into secondary coordinator state if needed
|
# Force into secondary coordinator state if needed
|
||||||
try:
|
try:
|
||||||
if this_node.router_state == "primary" and len(d_node) > 1:
|
if this_node.coordinator_state == "primary" and len(d_node) > 1:
|
||||||
zkhandler.write([("base.config.primary_node", "none")])
|
zkhandler.write([("base.config.primary_node", "none")])
|
||||||
logger.out("Waiting for primary migration", state="s")
|
logger.out("Waiting for primary migration", state="s")
|
||||||
timeout = 240
|
timeout = 240
|
||||||
count = 0
|
count = 0
|
||||||
while this_node.router_state != "secondary" and count < timeout:
|
while this_node.coordinator_state != "secondary" and count < timeout:
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
count += 1
|
count += 1
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -431,7 +431,7 @@ def entrypoint():
|
||||||
if new_primary == "none":
|
if new_primary == "none":
|
||||||
if (
|
if (
|
||||||
this_node.daemon_state == "run"
|
this_node.daemon_state == "run"
|
||||||
and this_node.router_state
|
and this_node.coordinator_state
|
||||||
not in ["primary", "takeover", "relinquish"]
|
not in ["primary", "takeover", "relinquish"]
|
||||||
):
|
):
|
||||||
logger.out(
|
logger.out(
|
||||||
|
@ -477,7 +477,7 @@ def entrypoint():
|
||||||
state="i",
|
state="i",
|
||||||
)
|
)
|
||||||
elif new_primary == config["node_hostname"]:
|
elif new_primary == config["node_hostname"]:
|
||||||
if this_node.router_state == "secondary":
|
if this_node.coordinator_state == "secondary":
|
||||||
# Wait for 0.5s to ensure other contentions time out, then take over
|
# Wait for 0.5s to ensure other contentions time out, then take over
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
zkhandler.write(
|
zkhandler.write(
|
||||||
|
@ -489,7 +489,7 @@ def entrypoint():
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
# Wait for 0.5s to ensure other contentions time out, then relinquish
|
# Wait for 0.5s to ensure other contentions time out, then relinquish
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
zkhandler.write(
|
zkhandler.write(
|
||||||
|
@ -536,7 +536,7 @@ def entrypoint():
|
||||||
)
|
)
|
||||||
# Start primary functionality
|
# Start primary functionality
|
||||||
if (
|
if (
|
||||||
this_node.router_state == "primary"
|
this_node.coordinator_state == "primary"
|
||||||
and d_network[network].nettype == "managed"
|
and d_network[network].nettype == "managed"
|
||||||
):
|
):
|
||||||
d_network[network].createGateways()
|
d_network[network].createGateways()
|
||||||
|
@ -549,7 +549,7 @@ def entrypoint():
|
||||||
# TODO: Move this to the Network structure
|
# TODO: Move this to the Network structure
|
||||||
if d_network[network].nettype == "managed":
|
if d_network[network].nettype == "managed":
|
||||||
# Stop primary functionality
|
# Stop primary functionality
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
d_network[network].stopDHCPServer()
|
d_network[network].stopDHCPServer()
|
||||||
d_network[network].removeGateways()
|
d_network[network].removeGateways()
|
||||||
dns_aggregator.remove_network(d_network[network])
|
dns_aggregator.remove_network(d_network[network])
|
||||||
|
|
|
@ -368,7 +368,7 @@ class MonitoringInstance(object):
|
||||||
def run_plugin(self, plugin):
|
def run_plugin(self, plugin):
|
||||||
time_start = datetime.now()
|
time_start = datetime.now()
|
||||||
try:
|
try:
|
||||||
result = plugin.run(coordinator_state=self.this_node.router_state)
|
result = plugin.run(coordinator_state=self.this_node.coordinator_state)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
f"Monitoring plugin {plugin.plugin_name} failed: {type(e).__name__}: {e}",
|
f"Monitoring plugin {plugin.plugin_name} failed: {type(e).__name__}: {e}",
|
||||||
|
@ -384,9 +384,9 @@ class MonitoringInstance(object):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def run_plugins(self):
|
def run_plugins(self):
|
||||||
if self.this_node.router_state == "primary":
|
if self.this_node.coordinator_state == "primary":
|
||||||
cst_colour = self.logger.fmt_green
|
cst_colour = self.logger.fmt_green
|
||||||
elif self.this_node.router_state == "secondary":
|
elif self.this_node.coordinator_state == "secondary":
|
||||||
cst_colour = self.logger.fmt_blue
|
cst_colour = self.logger.fmt_blue
|
||||||
else:
|
else:
|
||||||
cst_colour = self.logger.fmt_cyan
|
cst_colour = self.logger.fmt_cyan
|
||||||
|
@ -398,7 +398,7 @@ class MonitoringInstance(object):
|
||||||
datetime.now(),
|
datetime.now(),
|
||||||
self.logger.fmt_end,
|
self.logger.fmt_end,
|
||||||
self.logger.fmt_bold + cst_colour,
|
self.logger.fmt_bold + cst_colour,
|
||||||
self.this_node.router_state,
|
self.this_node.coordinator_state,
|
||||||
self.logger.fmt_end,
|
self.logger.fmt_end,
|
||||||
),
|
),
|
||||||
state="t",
|
state="t",
|
||||||
|
|
|
@ -52,7 +52,7 @@ class NodeInstance(object):
|
||||||
# States
|
# States
|
||||||
self.daemon_mode = self.zkhandler.read(("node.mode", self.name))
|
self.daemon_mode = self.zkhandler.read(("node.mode", self.name))
|
||||||
self.daemon_state = "stop"
|
self.daemon_state = "stop"
|
||||||
self.router_state = "client"
|
self.coordinator_state = "client"
|
||||||
self.domain_state = "flushed"
|
self.domain_state = "flushed"
|
||||||
# Object lists
|
# Object lists
|
||||||
self.d_node = d_node
|
self.d_node = d_node
|
||||||
|
@ -149,10 +149,10 @@ class NodeInstance(object):
|
||||||
|
|
||||||
if self.name == self.this_node and self.daemon_mode == "coordinator":
|
if self.name == self.this_node and self.daemon_mode == "coordinator":
|
||||||
# We're a coordinator so we care about networking
|
# We're a coordinator so we care about networking
|
||||||
if data != self.router_state:
|
if data != self.coordinator_state:
|
||||||
self.router_state = data
|
self.coordinator_state = data
|
||||||
if self.config["enable_networking"]:
|
if self.config["enable_networking"]:
|
||||||
if self.router_state == "takeover":
|
if self.coordinator_state == "takeover":
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Setting node {} to primary state".format(self.name),
|
"Setting node {} to primary state".format(self.name),
|
||||||
state="i",
|
state="i",
|
||||||
|
@ -161,7 +161,7 @@ class NodeInstance(object):
|
||||||
target=self.become_primary, args=(), kwargs={}
|
target=self.become_primary, args=(), kwargs={}
|
||||||
)
|
)
|
||||||
transition_thread.start()
|
transition_thread.start()
|
||||||
if self.router_state == "relinquish":
|
if self.coordinator_state == "relinquish":
|
||||||
# Skip becoming secondary unless already running
|
# Skip becoming secondary unless already running
|
||||||
if (
|
if (
|
||||||
self.daemon_state == "run"
|
self.daemon_state == "run"
|
||||||
|
@ -539,7 +539,7 @@ class NodeInstance(object):
|
||||||
tick = 1
|
tick = 1
|
||||||
patroni_failed = True
|
patroni_failed = True
|
||||||
# As long as we're in takeover, keep trying to set the Patroni leader to us
|
# As long as we're in takeover, keep trying to set the Patroni leader to us
|
||||||
while self.router_state == "takeover":
|
while self.coordinator_state == "takeover":
|
||||||
# Switch Patroni leader to the local instance
|
# Switch Patroni leader to the local instance
|
||||||
retcode, stdout, stderr = common.run_os_command(
|
retcode, stdout, stderr = common.run_os_command(
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -306,11 +306,11 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
|
|
||||||
if data and self.ip6_gateway != data.decode("ascii"):
|
if data and self.ip6_gateway != data.decode("ascii"):
|
||||||
orig_gateway = self.ip6_gateway
|
orig_gateway = self.ip6_gateway
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
if orig_gateway:
|
if orig_gateway:
|
||||||
self.removeGateway6Address()
|
self.removeGateway6Address()
|
||||||
self.ip6_gateway = data.decode("ascii")
|
self.ip6_gateway = data.decode("ascii")
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
self.createGateway6Address()
|
self.createGateway6Address()
|
||||||
if self.dhcp_server_daemon:
|
if self.dhcp_server_daemon:
|
||||||
self.stopDHCPServer()
|
self.stopDHCPServer()
|
||||||
|
@ -333,13 +333,13 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
if (
|
if (
|
||||||
self.dhcp6_flag
|
self.dhcp6_flag
|
||||||
and not self.dhcp_server_daemon
|
and not self.dhcp_server_daemon
|
||||||
and self.this_node.router_state in ["primary", "takeover"]
|
and self.this_node.coordinator_state in ["primary", "takeover"]
|
||||||
):
|
):
|
||||||
self.startDHCPServer()
|
self.startDHCPServer()
|
||||||
elif (
|
elif (
|
||||||
self.dhcp_server_daemon
|
self.dhcp_server_daemon
|
||||||
and not self.dhcp4_flag
|
and not self.dhcp4_flag
|
||||||
and self.this_node.router_state in ["primary", "takeover"]
|
and self.this_node.coordinator_state in ["primary", "takeover"]
|
||||||
):
|
):
|
||||||
self.stopDHCPServer()
|
self.stopDHCPServer()
|
||||||
|
|
||||||
|
@ -371,11 +371,11 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
|
|
||||||
if data and self.ip4_gateway != data.decode("ascii"):
|
if data and self.ip4_gateway != data.decode("ascii"):
|
||||||
orig_gateway = self.ip4_gateway
|
orig_gateway = self.ip4_gateway
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
if orig_gateway:
|
if orig_gateway:
|
||||||
self.removeGateway4Address()
|
self.removeGateway4Address()
|
||||||
self.ip4_gateway = data.decode("ascii")
|
self.ip4_gateway = data.decode("ascii")
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
self.createGateway4Address()
|
self.createGateway4Address()
|
||||||
if self.dhcp_server_daemon:
|
if self.dhcp_server_daemon:
|
||||||
self.stopDHCPServer()
|
self.stopDHCPServer()
|
||||||
|
@ -398,13 +398,13 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
if (
|
if (
|
||||||
self.dhcp4_flag
|
self.dhcp4_flag
|
||||||
and not self.dhcp_server_daemon
|
and not self.dhcp_server_daemon
|
||||||
and self.this_node.router_state in ["primary", "takeover"]
|
and self.this_node.coordinator_state in ["primary", "takeover"]
|
||||||
):
|
):
|
||||||
self.startDHCPServer()
|
self.startDHCPServer()
|
||||||
elif (
|
elif (
|
||||||
self.dhcp_server_daemon
|
self.dhcp_server_daemon
|
||||||
and not self.dhcp6_flag
|
and not self.dhcp6_flag
|
||||||
and self.this_node.router_state in ["primary", "takeover"]
|
and self.this_node.coordinator_state in ["primary", "takeover"]
|
||||||
):
|
):
|
||||||
self.stopDHCPServer()
|
self.stopDHCPServer()
|
||||||
|
|
||||||
|
@ -450,7 +450,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
if self.dhcp_reservations != new_reservations:
|
if self.dhcp_reservations != new_reservations:
|
||||||
old_reservations = self.dhcp_reservations
|
old_reservations = self.dhcp_reservations
|
||||||
self.dhcp_reservations = new_reservations
|
self.dhcp_reservations = new_reservations
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
self.updateDHCPReservations(old_reservations, new_reservations)
|
self.updateDHCPReservations(old_reservations, new_reservations)
|
||||||
if self.dhcp_server_daemon:
|
if self.dhcp_server_daemon:
|
||||||
self.stopDHCPServer()
|
self.stopDHCPServer()
|
||||||
|
@ -706,7 +706,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
self.createGateway4Address()
|
self.createGateway4Address()
|
||||||
|
|
||||||
def createGateway6Address(self):
|
def createGateway6Address(self):
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Creating gateway {}/{} on interface {}".format(
|
"Creating gateway {}/{} on interface {}".format(
|
||||||
self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic
|
self.ip6_gateway, self.ip6_cidrnetmask, self.bridge_nic
|
||||||
|
@ -719,7 +719,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
)
|
)
|
||||||
|
|
||||||
def createGateway4Address(self):
|
def createGateway4Address(self):
|
||||||
if self.this_node.router_state in ["primary", "takeover"]:
|
if self.this_node.coordinator_state in ["primary", "takeover"]:
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Creating gateway {}/{} on interface {}".format(
|
"Creating gateway {}/{} on interface {}".format(
|
||||||
self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic
|
self.ip4_gateway, self.ip4_cidrnetmask, self.bridge_nic
|
||||||
|
@ -733,7 +733,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
|
||||||
|
|
||||||
def startDHCPServer(self):
|
def startDHCPServer(self):
|
||||||
if (
|
if (
|
||||||
self.this_node.router_state in ["primary", "takeover"]
|
self.this_node.coordinator_state in ["primary", "takeover"]
|
||||||
and self.nettype == "managed"
|
and self.nettype == "managed"
|
||||||
):
|
):
|
||||||
self.logger.out(
|
self.logger.out(
|
||||||
|
|
|
@ -98,7 +98,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||||
return
|
return
|
||||||
|
|
||||||
# Primary-only functions
|
# Primary-only functions
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
# Get Ceph status information (pretty)
|
# Get Ceph status information (pretty)
|
||||||
if debug:
|
if debug:
|
||||||
logger.out(
|
logger.out(
|
||||||
|
@ -417,7 +417,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Upload OSD data for the cluster (primary-only)
|
# Upload OSD data for the cluster (primary-only)
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
if debug:
|
if debug:
|
||||||
logger.out(
|
logger.out(
|
||||||
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
|
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
|
||||||
|
@ -679,9 +679,9 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
|
||||||
|
|
||||||
# Display node information to the terminal
|
# Display node information to the terminal
|
||||||
if config["log_keepalives"]:
|
if config["log_keepalives"]:
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
cst_colour = logger.fmt_green
|
cst_colour = logger.fmt_green
|
||||||
elif this_node.router_state == "secondary":
|
elif this_node.coordinator_state == "secondary":
|
||||||
cst_colour = logger.fmt_blue
|
cst_colour = logger.fmt_blue
|
||||||
else:
|
else:
|
||||||
cst_colour = logger.fmt_cyan
|
cst_colour = logger.fmt_cyan
|
||||||
|
@ -692,7 +692,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
|
||||||
datetime.now(),
|
datetime.now(),
|
||||||
logger.fmt_end,
|
logger.fmt_end,
|
||||||
logger.fmt_bold + cst_colour,
|
logger.fmt_bold + cst_colour,
|
||||||
this_node.router_state,
|
this_node.coordinator_state,
|
||||||
logger.fmt_end,
|
logger.fmt_end,
|
||||||
),
|
),
|
||||||
state="t",
|
state="t",
|
||||||
|
@ -700,7 +700,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
|
||||||
|
|
||||||
# Set the migration selector in Zookeeper for clients to read
|
# Set the migration selector in Zookeeper for clients to read
|
||||||
if config["enable_hypervisor"]:
|
if config["enable_hypervisor"]:
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
try:
|
try:
|
||||||
if (
|
if (
|
||||||
zkhandler.read("base.config.migration_target_selector")
|
zkhandler.read("base.config.migration_target_selector")
|
||||||
|
@ -723,7 +723,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
|
||||||
|
|
||||||
# Set the upstream IP in Zookeeper for clients to read
|
# Set the upstream IP in Zookeeper for clients to read
|
||||||
if config["enable_networking"]:
|
if config["enable_networking"]:
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
try:
|
try:
|
||||||
if (
|
if (
|
||||||
zkhandler.read("base.config.upstream_ip")
|
zkhandler.read("base.config.upstream_ip")
|
||||||
|
@ -757,7 +757,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
|
||||||
logger.out(
|
logger.out(
|
||||||
"Ensure the primary key is properly set", state="d", prefix="main-thread"
|
"Ensure the primary key is properly set", state="d", prefix="main-thread"
|
||||||
)
|
)
|
||||||
if this_node.router_state == "primary":
|
if this_node.coordinator_state == "primary":
|
||||||
if zkhandler.read("base.config.primary_node") != this_node.name:
|
if zkhandler.read("base.config.primary_node") != this_node.name:
|
||||||
zkhandler.write([("base.config.primary_node", this_node.name)])
|
zkhandler.write([("base.config.primary_node", this_node.name)])
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue