diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 90602d97..ddb08cec 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -565,7 +565,7 @@ else: # This will only change by the API when triggered after seeing all nodes can update @zkhandler.zk_conn.DataWatch(zkschema.path('base.schema.version')) def update_schema(new_schema_version, stat, event=''): - global zkschema, zkhandler, update_timer + global zkschema, zkhandler, update_timer, node_schema_version new_schema_version = int(new_schema_version.decode('ascii')) @@ -577,6 +577,7 @@ def update_schema(new_schema_version, stat, event=''): # Prevent any keepalive updates while this happens if update_timer is not None: stopKeepaliveTimer() + time.sleep(1) # Perform the migration (primary only) if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname: @@ -587,7 +588,10 @@ def update_schema(new_schema_version, stat, event=''): with zkhandler.exclusivelock('/'): # Perform the schema migration tasks logger.out('Performing schema update', state='s') - zkschema.migrate(zkhandler, latest_schema_version) + if new_schema_version > node_schema_version: + zkschema.migrate(zkhandler, new_schema_version) + if new_schema_version < node_schema_version: + zkschema.rollback(zkhandler, new_schema_version) # Wait for the exclusive lock to be lifted else: logger.out('Non-primary node acquiring read lock', state='s') @@ -605,12 +609,8 @@ def update_schema(new_schema_version, stat, event=''): zkhandler.write([ (zkschema.path('node.data.active_schema', myhostname), new_schema_version) ]) - - # Restart the zookeeper connection - logger.out('Restarting Zookeeper connection', state='s') - zkhandler.disconnect() + node_schema_version = new_schema_version time.sleep(1) - zkhandler.connect(persistent=True) # Restart the update timer if update_timer is not None: @@ -626,7 +626,7 @@ def update_schema(new_schema_version, stat, event=''): if latest_schema_version > node_schema_version: node_latest_schema_version = list() for node in zkhandler.children(zkschema.path('base.node')): - node_latest_schema_version.append(zkhandler.read(zkschema.path('node.data.latest_schema', node))) + node_latest_schema_version.append(int(zkhandler.read(zkschema.path('node.data.latest_schema', node)))) # This is true if all elements of the latest schema version are identical to the latest version, # i.e. they have all had the latest schema installed and ready to load.