diff --git a/pvcd.py b/pvcd.py index 867b7d63..a7b9e9b3 100755 --- a/pvcd.py +++ b/pvcd.py @@ -102,12 +102,43 @@ except: print('ERROR: Failed to connect to Zookeeper') exit(1) +# Handle zookeeper failures +def zk_listener(state): + global zk, update_timer + if state == kazoo.client.KazooState.SUSPENDED: + ansiiprint.echo('Connection to Zookeeper lost; retrying', '', 'e') + + # Stop keepalive thread + stopKeepaliveTimer(update_timer) + + while True: + _zk = kazoo.client.KazooClient(hosts=config['zookeeper']) + try: + _zk.start() + zk = _zk + break + except: + time.sleep(1) + elif state == kazoo.client.KazooState.CONNECTED: + ansiiprint.echo('Connection to Zookeeper started', '', 'o') + + # Start keepalive thread + update_timer = createKeepaliveTimer() + else: + pass + +zk.add_listener(zk_listener) + +# Cleanup function def cleanup(signum, frame): ansiiprint.echo('Terminating daemon', '', 'e') + # Set stop state in Zookeeper zk.set('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii')) - zk.stop() + # Close the Zookeeper connection zk.close() - update_timer.join() + # Stop keepalive thread + stopKeepaliveTimer(update_timer) + # Exit sys.exit(0) # Handle signals gracefully @@ -191,9 +222,19 @@ this_node = t_node[myhostname] update_zookeeper = this_node.update_zookeeper # Create timer to update this node in Zookeeper -update_timer = apscheduler.schedulers.background.BackgroundScheduler() -update_timer.add_job(update_zookeeper, 'interval', seconds=int(config['keepalive_interval'])) -update_timer.start() +def createKeepaliveTimer(): + ansiiprint.echo('Starting keepalive timer', '', 'o') + update_timer = apscheduler.schedulers.background.BackgroundScheduler() + update_timer.add_job(update_zookeeper, 'interval', seconds=int(config['keepalive_interval'])) + update_timer.start() + return update_timer + +def stopKeepaliveTimer(update_timer): + ansiiprint.echo('Stopping keepalive timer', '', 'c') + update_timer.shutdown() + +# Start keepalive thread +update_timer = createKeepaliveTimer() # Tick loop while True: