Improve Zookeeper log handling
Ensures that messages are fully read before each append. Adds more Zookeeper hits, but ensures logs won't be overwritten by multiple daemons. Also don't use a set on the client side, to avoid "removing duplicate" entries erroneously.
This commit is contained in:
parent
1adc3674b6
commit
7abc697c8a
|
@ -142,9 +142,8 @@ def follow_node_log(config, node, lines=10):
|
||||||
node_log = new_node_log
|
node_log = new_node_log
|
||||||
|
|
||||||
# Get the difference between the two sets of lines
|
# Get the difference between the two sets of lines
|
||||||
old_node_loglines_set = set(old_node_loglines)
|
|
||||||
diff_node_loglines = [
|
diff_node_loglines = [
|
||||||
x for x in new_node_loglines if x not in old_node_loglines_set
|
x for x in new_node_loglines if x not in old_node_loglines
|
||||||
]
|
]
|
||||||
|
|
||||||
# If there's a difference, print it out
|
# If there's a difference, print it out
|
||||||
|
|
|
@ -198,16 +198,9 @@ class ZookeeperLogger(Thread):
|
||||||
self.zkhandler.write([("base.logs", ""), (("logs", self.node), "")])
|
self.zkhandler.write([("base.logs", ""), (("logs", self.node), "")])
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self.connected:
|
|
||||||
self.start_zkhandler()
|
self.start_zkhandler()
|
||||||
sleep(1)
|
|
||||||
|
|
||||||
self.running = True
|
self.running = True
|
||||||
# Get the logs that are currently in Zookeeper and populate our deque
|
|
||||||
raw_logs = self.zkhandler.read(("logs.messages", self.node))
|
|
||||||
if raw_logs is None:
|
|
||||||
raw_logs = ""
|
|
||||||
logs = deque(raw_logs.split("\n"), self.max_lines)
|
|
||||||
while self.running:
|
while self.running:
|
||||||
# Get a new message
|
# Get a new message
|
||||||
try:
|
try:
|
||||||
|
@ -222,25 +215,26 @@ class ZookeeperLogger(Thread):
|
||||||
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
|
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
|
||||||
else:
|
else:
|
||||||
date = ""
|
date = ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self.zkhandler.writelock(("logs.messages", self.node)):
|
||||||
|
# Get the logs that are currently in Zookeeper and populate our deque
|
||||||
|
cur_logs = self.zkhandler.read(("logs.messages", self.node))
|
||||||
|
if cur_logs is None:
|
||||||
|
cur_logs = ""
|
||||||
|
|
||||||
|
logs = deque(cur_logs.split("\n"), self.max_lines - 1)
|
||||||
|
|
||||||
# Add the message to the deque
|
# Add the message to the deque
|
||||||
logs.append(f"{date}{message}")
|
logs.append(f"{date}{message}")
|
||||||
|
|
||||||
tick_count = 0
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Write the updated messages into Zookeeper
|
# Write the updated messages into Zookeeper
|
||||||
self.zkhandler.write(
|
self.zkhandler.write(
|
||||||
[(("logs.messages", self.node), "\n".join(logs))]
|
[(("logs.messages", self.node), "\n".join(logs))]
|
||||||
)
|
)
|
||||||
break
|
|
||||||
except Exception:
|
except Exception:
|
||||||
# The write failed (connection loss, etc.) so retry for 15 seconds
|
|
||||||
sleep(0.5)
|
|
||||||
tick_count += 1
|
|
||||||
if tick_count > 30:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Loading…
Reference in New Issue