Finish some better Zookeeper listening logic and functionlize start/stop of timer
This commit is contained in:
parent
04f5fc8f32
commit
b1e77f6a20
45
pvcd.py
45
pvcd.py
|
@ -102,12 +102,43 @@ except:
|
||||||
print('ERROR: Failed to connect to Zookeeper')
|
print('ERROR: Failed to connect to Zookeeper')
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
|
# Handle zookeeper failures
|
||||||
|
def zk_listener(state):
|
||||||
|
global zk, update_timer
|
||||||
|
if state == kazoo.client.KazooState.SUSPENDED:
|
||||||
|
ansiiprint.echo('Connection to Zookeeper lost; retrying', '', 'e')
|
||||||
|
|
||||||
|
# Stop keepalive thread
|
||||||
|
stopKeepaliveTimer(update_timer)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
_zk = kazoo.client.KazooClient(hosts=config['zookeeper'])
|
||||||
|
try:
|
||||||
|
_zk.start()
|
||||||
|
zk = _zk
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(1)
|
||||||
|
elif state == kazoo.client.KazooState.CONNECTED:
|
||||||
|
ansiiprint.echo('Connection to Zookeeper started', '', 'o')
|
||||||
|
|
||||||
|
# Start keepalive thread
|
||||||
|
update_timer = createKeepaliveTimer()
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
zk.add_listener(zk_listener)
|
||||||
|
|
||||||
|
# Cleanup function
|
||||||
def cleanup(signum, frame):
|
def cleanup(signum, frame):
|
||||||
ansiiprint.echo('Terminating daemon', '', 'e')
|
ansiiprint.echo('Terminating daemon', '', 'e')
|
||||||
|
# Set stop state in Zookeeper
|
||||||
zk.set('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii'))
|
zk.set('/nodes/{}/daemonstate'.format(myhostname), 'stop'.encode('ascii'))
|
||||||
zk.stop()
|
# Close the Zookeeper connection
|
||||||
zk.close()
|
zk.close()
|
||||||
update_timer.join()
|
# Stop keepalive thread
|
||||||
|
stopKeepaliveTimer(update_timer)
|
||||||
|
# Exit
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
# Handle signals gracefully
|
# Handle signals gracefully
|
||||||
|
@ -191,9 +222,19 @@ this_node = t_node[myhostname]
|
||||||
update_zookeeper = this_node.update_zookeeper
|
update_zookeeper = this_node.update_zookeeper
|
||||||
|
|
||||||
# Create timer to update this node in Zookeeper
|
# Create timer to update this node in Zookeeper
|
||||||
|
def createKeepaliveTimer():
|
||||||
|
ansiiprint.echo('Starting keepalive timer', '', 'o')
|
||||||
update_timer = apscheduler.schedulers.background.BackgroundScheduler()
|
update_timer = apscheduler.schedulers.background.BackgroundScheduler()
|
||||||
update_timer.add_job(update_zookeeper, 'interval', seconds=int(config['keepalive_interval']))
|
update_timer.add_job(update_zookeeper, 'interval', seconds=int(config['keepalive_interval']))
|
||||||
update_timer.start()
|
update_timer.start()
|
||||||
|
return update_timer
|
||||||
|
|
||||||
|
def stopKeepaliveTimer(update_timer):
|
||||||
|
ansiiprint.echo('Stopping keepalive timer', '', 'c')
|
||||||
|
update_timer.shutdown()
|
||||||
|
|
||||||
|
# Start keepalive thread
|
||||||
|
update_timer = createKeepaliveTimer()
|
||||||
|
|
||||||
# Tick loop
|
# Tick loop
|
||||||
while True:
|
while True:
|
||||||
|
|
Loading…
Reference in New Issue