From e9df043c0a1d67000e2817f8845ba8ee948bf266 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Mon, 19 Jul 2021 11:45:58 -0400 Subject: [PATCH] Ensure ZK logging does not block startup --- daemon-common/log.py | 56 ++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/daemon-common/log.py b/daemon-common/log.py index e1375a88..37e79fc2 100644 --- a/daemon-common/log.py +++ b/daemon-common/log.py @@ -23,6 +23,7 @@ from collections import deque from threading import Thread from queue import Queue from datetime import datetime +from time import sleep from daemon_lib.zkhandler import ZKHandler @@ -83,7 +84,8 @@ class Logger(object): self.last_prompt = '' if self.config['zookeeper_logging']: - self.zookeeper_logger = ZookeeperLogger(config) + self.zookeeper_queue = Queue() + self.zookeeper_logger = ZookeeperLogger(self.config, self.zookeeper_queue) self.zookeeper_logger.start() # Provide a hup function to close and reopen the writer @@ -96,9 +98,15 @@ class Logger(object): if self.config['file_logging']: self.writer.close() if self.config['zookeeper_logging']: - self.out("Waiting for Zookeeper message queue to drain", state='s') - while not self.zookeeper_logger.queue.empty(): - pass + self.out("Waiting 15s for Zookeeper message queue to drain", state='s') + + tick_count = 0 + while not self.zookeeper_queue.empty(): + sleep(0.5) + tick_count += 1 + if tick_count > 30: + break + self.zookeeper_logger.stop() self.zookeeper_logger.join() @@ -145,7 +153,7 @@ class Logger(object): # Log to Zookeeper if self.config['zookeeper_logging']: - self.zookeeper_logger.queue.put(message) + self.zookeeper_queue.put(message) # Set last message variables self.last_colour = colour @@ -157,19 +165,14 @@ class ZookeeperLogger(Thread): Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other daemon events while the records are written. They will be eventually-consistent """ - def __init__(self, config): + def __init__(self, config, zookeeper_queue): self.config = config self.node = self.config['node'] self.max_lines = self.config['node_log_lines'] - self.queue = Queue() - self.zkhandler = None - self.start_zkhandler() - # Ensure the root keys for this are instantiated - self.zkhandler.write([ - ('base.logs', ''), - (('logs', self.node), '') - ]) + self.zookeeper_queue = zookeeper_queue + self.connected = False self.running = False + self.zkhandler = None Thread.__init__(self, args=(), kwargs=None) def start_zkhandler(self): @@ -179,10 +182,29 @@ class ZookeeperLogger(Thread): self.zkhandler.disconnect() except Exception: pass - self.zkhandler = ZKHandler(self.config, logger=None) - self.zkhandler.connect(persistent=True) + + while True: + try: + self.zkhandler = ZKHandler(self.config, logger=None) + self.zkhandler.connect(persistent=True) + break + except Exception: + sleep(0.5) + continue + + self.connected = True + + # Ensure the root keys for this are instantiated + self.zkhandler.write([ + ('base.logs', ''), + (('logs', self.node), '') + ]) def run(self): + while not self.connected: + self.start_zkhandler() + sleep(1) + self.running = True # Get the logs that are currently in Zookeeper and populate our deque raw_logs = self.zkhandler.read(('logs.messages', self.node)) @@ -192,7 +214,7 @@ class ZookeeperLogger(Thread): while self.running: # Get a new message try: - message = self.queue.get(timeout=1) + message = self.zookeeper_queue.get(timeout=1) if not message: continue except Exception: