Fix final termination of logger
We need to do a bit more finagling with the logger on termination to ensure that all messages are written and the queue drained before actually terminating.
This commit is contained in:
		| @@ -91,6 +91,17 @@ class Logger(object): | |||||||
|         self.writer.close() |         self.writer.close() | ||||||
|         self.writer = open(self.logfile, 'a', buffering=0) |         self.writer = open(self.logfile, 'a', buffering=0) | ||||||
|  |  | ||||||
|  |     # Provide a termination function so all messages are flushed before terminating the main daemon | ||||||
|  |     def terminate(self): | ||||||
|  |         if self.config['file_logging']: | ||||||
|  |             self.writer.close() | ||||||
|  |         if self.config['zookeeper_logging']: | ||||||
|  |             self.out("Waiting for Zookeeper message queue to drain", state='s') | ||||||
|  |             while not self.zookeeper_logger.queue.empty(): | ||||||
|  |                 pass | ||||||
|  |             self.zookeeper_logger.stop() | ||||||
|  |             self.zookeeper_logger.join() | ||||||
|  |  | ||||||
|     # Output function |     # Output function | ||||||
|     def out(self, message, state=None, prefix=''): |     def out(self, message, state=None, prefix=''): | ||||||
|  |  | ||||||
| @@ -154,6 +165,7 @@ class ZookeeperLogger(Thread): | |||||||
|         self.zkhandler = None |         self.zkhandler = None | ||||||
|         self.start_zkhandler() |         self.start_zkhandler() | ||||||
|         self.zkhandler.write([(('logs', self.node), '')]) |         self.zkhandler.write([(('logs', self.node), '')]) | ||||||
|  |         self.running = False | ||||||
|         Thread.__init__(self, args=(), kwargs=None) |         Thread.__init__(self, args=(), kwargs=None) | ||||||
|  |  | ||||||
|     def start_zkhandler(self): |     def start_zkhandler(self): | ||||||
| @@ -167,18 +179,28 @@ class ZookeeperLogger(Thread): | |||||||
|         self.zkhandler.connect(persistent=True) |         self.zkhandler.connect(persistent=True) | ||||||
|  |  | ||||||
|     def run(self): |     def run(self): | ||||||
|         while True: |         self.running = True | ||||||
|             message = self.queue.get() |         # Get the logs that are currently in Zookeeper and populate our deque | ||||||
|             self.write_log(message) |         logs = deque(self.zkhandler.read(('logs.messages', self.node)).split('\n'), self.max_lines) | ||||||
|  |         while self.running: | ||||||
|  |             # Get a new message | ||||||
|  |             try: | ||||||
|  |                 message = self.queue.get(block=False) | ||||||
|  |                 if not message: | ||||||
|  |                     continue | ||||||
|  |             except Exception: | ||||||
|  |                 continue | ||||||
|  |  | ||||||
|     def write_log(self, message): |  | ||||||
|         # Log to Zookeeper |  | ||||||
|         with self.zkhandler.writelock(('logs.messages', self.node)): |  | ||||||
|             logs = deque(self.zkhandler.read(('logs.messages', self.node)).split('\n'), self.max_lines) |  | ||||||
|             if not self.config['log_dates']: |             if not self.config['log_dates']: | ||||||
|                 # We want to log dates here, even if the log_dates config is not set |                 # We want to log dates here, even if the log_dates config is not set | ||||||
|                 date = '{} '.format(datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f')) |                 date = '{} '.format(datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f')) | ||||||
|             else: |             else: | ||||||
|                 date = '' |                 date = '' | ||||||
|  |             # Add the message to the deque | ||||||
|             logs.append(f'{date}{message}') |             logs.append(f'{date}{message}') | ||||||
|  |             # Write the updated messages into Zookeeper | ||||||
|             self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))]) |             self.zkhandler.write([(('logs.messages', self.node), '\n'.join(logs))]) | ||||||
|  |         return | ||||||
|  |  | ||||||
|  |     def stop(self): | ||||||
|  |         self.running = False | ||||||
|   | |||||||
| @@ -761,6 +761,7 @@ def cleanup(): | |||||||
|         pass |         pass | ||||||
|  |  | ||||||
|     logger.out('Terminated pvc daemon', state='s') |     logger.out('Terminated pvc daemon', state='s') | ||||||
|  |     logger.terminate() | ||||||
|  |  | ||||||
|     os._exit(0) |     os._exit(0) | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user