Ensure ZK logging does not block startup

This commit is contained in:
Joshua Boniface 2021-07-19 11:45:58 -04:00
parent 71e4d0b32a
commit e9df043c0a
1 changed files with 39 additions and 17 deletions

View File

@ -23,6 +23,7 @@ from collections import deque
from threading import Thread from threading import Thread
from queue import Queue from queue import Queue
from datetime import datetime from datetime import datetime
from time import sleep
from daemon_lib.zkhandler import ZKHandler from daemon_lib.zkhandler import ZKHandler
@ -83,7 +84,8 @@ class Logger(object):
self.last_prompt = '' self.last_prompt = ''
if self.config['zookeeper_logging']: if self.config['zookeeper_logging']:
self.zookeeper_logger = ZookeeperLogger(config) self.zookeeper_queue = Queue()
self.zookeeper_logger = ZookeeperLogger(self.config, self.zookeeper_queue)
self.zookeeper_logger.start() self.zookeeper_logger.start()
# Provide a hup function to close and reopen the writer # Provide a hup function to close and reopen the writer
@ -96,9 +98,15 @@ class Logger(object):
if self.config['file_logging']: if self.config['file_logging']:
self.writer.close() self.writer.close()
if self.config['zookeeper_logging']: if self.config['zookeeper_logging']:
self.out("Waiting for Zookeeper message queue to drain", state='s') self.out("Waiting 15s for Zookeeper message queue to drain", state='s')
while not self.zookeeper_logger.queue.empty():
pass tick_count = 0
while not self.zookeeper_queue.empty():
sleep(0.5)
tick_count += 1
if tick_count > 30:
break
self.zookeeper_logger.stop() self.zookeeper_logger.stop()
self.zookeeper_logger.join() self.zookeeper_logger.join()
@ -145,7 +153,7 @@ class Logger(object):
# Log to Zookeeper # Log to Zookeeper
if self.config['zookeeper_logging']: if self.config['zookeeper_logging']:
self.zookeeper_logger.queue.put(message) self.zookeeper_queue.put(message)
# Set last message variables # Set last message variables
self.last_colour = colour self.last_colour = colour
@ -157,19 +165,14 @@ class ZookeeperLogger(Thread):
Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other Defines a threaded writer for Zookeeper locks. Threading prevents the blocking of other
daemon events while the records are written. They will be eventually-consistent daemon events while the records are written. They will be eventually-consistent
""" """
def __init__(self, config): def __init__(self, config, zookeeper_queue):
self.config = config self.config = config
self.node = self.config['node'] self.node = self.config['node']
self.max_lines = self.config['node_log_lines'] self.max_lines = self.config['node_log_lines']
self.queue = Queue() self.zookeeper_queue = zookeeper_queue
self.zkhandler = None self.connected = False
self.start_zkhandler()
# Ensure the root keys for this are instantiated
self.zkhandler.write([
('base.logs', ''),
(('logs', self.node), '')
])
self.running = False self.running = False
self.zkhandler = None
Thread.__init__(self, args=(), kwargs=None) Thread.__init__(self, args=(), kwargs=None)
def start_zkhandler(self): def start_zkhandler(self):
@ -179,10 +182,29 @@ class ZookeeperLogger(Thread):
self.zkhandler.disconnect() self.zkhandler.disconnect()
except Exception: except Exception:
pass pass
while True:
try:
self.zkhandler = ZKHandler(self.config, logger=None) self.zkhandler = ZKHandler(self.config, logger=None)
self.zkhandler.connect(persistent=True) self.zkhandler.connect(persistent=True)
break
except Exception:
sleep(0.5)
continue
self.connected = True
# Ensure the root keys for this are instantiated
self.zkhandler.write([
('base.logs', ''),
(('logs', self.node), '')
])
def run(self): def run(self):
while not self.connected:
self.start_zkhandler()
sleep(1)
self.running = True self.running = True
# Get the logs that are currently in Zookeeper and populate our deque # Get the logs that are currently in Zookeeper and populate our deque
raw_logs = self.zkhandler.read(('logs.messages', self.node)) raw_logs = self.zkhandler.read(('logs.messages', self.node))
@ -192,7 +214,7 @@ class ZookeeperLogger(Thread):
while self.running: while self.running:
# Get a new message # Get a new message
try: try:
message = self.queue.get(timeout=1) message = self.zookeeper_queue.get(timeout=1)
if not message: if not message:
continue continue
except Exception: except Exception: