Fix some bugs with hot reload

This commit is contained in:
Joshua Boniface 2021-06-09 00:03:26 -04:00
parent 5540bdc86b
commit e475552391
1 changed files with 8 additions and 8 deletions

View File

@ -565,7 +565,7 @@ else:
# This will only change by the API when triggered after seeing all nodes can update # This will only change by the API when triggered after seeing all nodes can update
@zkhandler.zk_conn.DataWatch(zkschema.path('base.schema.version')) @zkhandler.zk_conn.DataWatch(zkschema.path('base.schema.version'))
def update_schema(new_schema_version, stat, event=''): def update_schema(new_schema_version, stat, event=''):
global zkschema, zkhandler, update_timer global zkschema, zkhandler, update_timer, node_schema_version
new_schema_version = int(new_schema_version.decode('ascii')) new_schema_version = int(new_schema_version.decode('ascii'))
@ -577,6 +577,7 @@ def update_schema(new_schema_version, stat, event=''):
# Prevent any keepalive updates while this happens # Prevent any keepalive updates while this happens
if update_timer is not None: if update_timer is not None:
stopKeepaliveTimer() stopKeepaliveTimer()
time.sleep(1)
# Perform the migration (primary only) # Perform the migration (primary only)
if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname: if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname:
@ -587,7 +588,10 @@ def update_schema(new_schema_version, stat, event=''):
with zkhandler.exclusivelock('/'): with zkhandler.exclusivelock('/'):
# Perform the schema migration tasks # Perform the schema migration tasks
logger.out('Performing schema update', state='s') logger.out('Performing schema update', state='s')
zkschema.migrate(zkhandler, latest_schema_version) if new_schema_version > node_schema_version:
zkschema.migrate(zkhandler, new_schema_version)
if new_schema_version < node_schema_version:
zkschema.rollback(zkhandler, new_schema_version)
# Wait for the exclusive lock to be lifted # Wait for the exclusive lock to be lifted
else: else:
logger.out('Non-primary node acquiring read lock', state='s') logger.out('Non-primary node acquiring read lock', state='s')
@ -605,12 +609,8 @@ def update_schema(new_schema_version, stat, event=''):
zkhandler.write([ zkhandler.write([
(zkschema.path('node.data.active_schema', myhostname), new_schema_version) (zkschema.path('node.data.active_schema', myhostname), new_schema_version)
]) ])
node_schema_version = new_schema_version
# Restart the zookeeper connection
logger.out('Restarting Zookeeper connection', state='s')
zkhandler.disconnect()
time.sleep(1) time.sleep(1)
zkhandler.connect(persistent=True)
# Restart the update timer # Restart the update timer
if update_timer is not None: if update_timer is not None:
@ -626,7 +626,7 @@ def update_schema(new_schema_version, stat, event=''):
if latest_schema_version > node_schema_version: if latest_schema_version > node_schema_version:
node_latest_schema_version = list() node_latest_schema_version = list()
for node in zkhandler.children(zkschema.path('base.node')): for node in zkhandler.children(zkschema.path('base.node')):
node_latest_schema_version.append(zkhandler.read(zkschema.path('node.data.latest_schema', node))) node_latest_schema_version.append(int(zkhandler.read(zkschema.path('node.data.latest_schema', node))))
# This is true if all elements of the latest schema version are identical to the latest version, # This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load. # i.e. they have all had the latest schema installed and ready to load.