diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 096f8f34..eed60677 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -40,8 +40,9 @@ from queue import Queue from xml.etree import ElementTree from rados import Rados +from daemon_lib.zkhandler import ZKHandler + import pvcnoded.log as log -import pvcnoded.zkhandler as zkhandler import pvcnoded.fencing as fencing import pvcnoded.common as common @@ -517,52 +518,18 @@ time.sleep(5) # PHASE 4 - Attempt to connect to the coordinators and start zookeeper client ############################################################################### -# Start the connection to the coordinators -zk_conn = kazoo.client.KazooClient(hosts=config['coordinators']) +# Create an instance of the handler +zkhandler = ZKHandler(config, logger=logger) + try: logger.out('Connecting to Zookeeper cluster nodes {}'.format(config['coordinators']), state='i') # Start connection - zk_conn.start() + zkhandler.connect(persistent=True) except Exception as e: logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e') exit(1) -# Handle zookeeper failures -def zk_listener(state): - global zk_conn, update_timer - if state == kazoo.client.KazooState.CONNECTED: - logger.out('Connection to Zookeeper restarted', state='o') - - # Start keepalive thread - if update_timer: - update_timer = startKeepaliveTimer() - else: - # Stop keepalive thread - if update_timer: - stopKeepaliveTimer() - - logger.out('Connection to Zookeeper lost; retrying', state='w') - - while True: - time.sleep(1) - - _zk_conn = kazoo.client.KazooClient(hosts=config['coordinators']) - try: - _zk_conn.start() - except Exception: - del _zk_conn - continue - - # Overwrite global zk_conn with new connection - zk_conn = _zk_conn - # Readd the listener - zk_conn.add_listener(zk_listener) - break - - -zk_conn.add_listener(zk_listener) - ############################################################################### # PHASE 5 - Gracefully handle termination ############################################################################### @@ -570,12 +537,14 @@ zk_conn.add_listener(zk_listener) # Cleanup function def cleanup(): - global zk_conn, update_timer, d_domain + global zkhandler, update_timer, d_domain logger.out('Terminating pvcnoded and cleaning up', state='s') # Set shutdown state in Zookeeper - zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(myhostname): 'shutdown'}) + zkhandler.write([ + ('/nodes/{}/daemonstate'.format(myhostname), 'shutdown') + ]) # Waiting for any flushes to complete logger.out('Waiting for any active flushes', state='s') @@ -596,9 +565,9 @@ def cleanup(): # Force into secondary coordinator state if needed try: if this_node.router_state == 'primary': - zkhandler.writedata(zk_conn, { - '/primary_node': 'none' - }) + zkhandler.write([ + ('/primary_node', 'none') + ]) logger.out('Waiting for primary migration', state='s') while this_node.router_state != 'secondary': time.sleep(0.5) @@ -617,15 +586,17 @@ def cleanup(): node_keepalive() # Set stop state in Zookeeper - zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(myhostname): 'stop'}) + zkhandler.write([ + ('/nodes/{}/daemonstate'.format(myhostname), 'stop') + ]) # Forcibly terminate dnsmasq because it gets stuck sometimes common.run_os_command('killall dnsmasq') # Close the Zookeeper connection try: - zk_conn.stop() - zk_conn.close() + zkhandler.disconnect() + del zkhandler except Exception: pass @@ -655,54 +626,54 @@ signal.signal(signal.SIGHUP, hup) ############################################################################### # Check if our node exists in Zookeeper, and create it if not -if zk_conn.exists('/nodes/{}'.format(myhostname)): +if zkhandler.exists('/nodes/{}'.format(myhostname)): logger.out("Node is " + fmt_green + "present" + fmt_end + " in Zookeeper", state='i') if config['daemon_mode'] == 'coordinator': init_routerstate = 'secondary' else: init_routerstate = 'client' # Update static data just in case it's changed - zkhandler.writedata(zk_conn, { - '/nodes/{}/daemonmode'.format(myhostname): config['daemon_mode'], - '/nodes/{}/daemonstate'.format(myhostname): 'init', - '/nodes/{}/routerstate'.format(myhostname): init_routerstate, - '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata), + zkhandler.write([ + ('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode']), + ('/nodes/{}/daemonstate'.format(myhostname), 'init'), + ('/nodes/{}/routerstate'.format(myhostname), init_routerstate), + ('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata)), # Keepalives and fencing information (always load and set from config on boot) - '/nodes/{}/ipmihostname'.format(myhostname): config['ipmi_hostname'], - '/nodes/{}/ipmiusername'.format(myhostname): config['ipmi_username'], - '/nodes/{}/ipmipassword'.format(myhostname): config['ipmi_password'] - }) + ('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname']), + ('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username']), + ('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password']) + ]) else: logger.out("Node is " + fmt_red + "absent" + fmt_end + " in Zookeeper; adding new node", state='i') keepalive_time = int(time.time()) - zkhandler.writedata(zk_conn, { - '/nodes/{}'.format(myhostname): config['daemon_mode'], + zkhandler.write([ + ('/nodes/{}'.format(myhostname), config['daemon_mode']), # Basic state information - '/nodes/{}/daemonmode'.format(myhostname): config['daemon_mode'], - '/nodes/{}/daemonstate'.format(myhostname): 'init', - '/nodes/{}/routerstate'.format(myhostname): 'client', - '/nodes/{}/domainstate'.format(myhostname): 'flushed', - '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata), - '/nodes/{}/memtotal'.format(myhostname): '0', - '/nodes/{}/memfree'.format(myhostname): '0', - '/nodes/{}/memused'.format(myhostname): '0', - '/nodes/{}/memalloc'.format(myhostname): '0', - '/nodes/{}/memprov'.format(myhostname): '0', - '/nodes/{}/vcpualloc'.format(myhostname): '0', - '/nodes/{}/cpuload'.format(myhostname): '0.0', - '/nodes/{}/networkscount'.format(myhostname): '0', - '/nodes/{}/domainscount'.format(myhostname): '0', - '/nodes/{}/runningdomains'.format(myhostname): '', + ('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode']), + ('/nodes/{}/daemonstate'.format(myhostname), 'init'), + ('/nodes/{}/routerstate'.format(myhostname), 'client'), + ('/nodes/{}/domainstate'.format(myhostname), 'flushed'), + ('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata)), + ('/nodes/{}/memtotal'.format(myhostname), '0'), + ('/nodes/{}/memfree'.format(myhostname), '0'), + ('/nodes/{}/memused'.format(myhostname), '0'), + ('/nodes/{}/memalloc'.format(myhostname), '0'), + ('/nodes/{}/memprov'.format(myhostname), '0'), + ('/nodes/{}/vcpualloc'.format(myhostname), '0'), + ('/nodes/{}/cpuload'.format(myhostname), '0.0'), + ('/nodes/{}/networkscount'.format(myhostname), '0'), + ('/nodes/{}/domainscount'.format(myhostname), '0'), + ('/nodes/{}/runningdomains'.format(myhostname), ''), # Keepalives and fencing information - '/nodes/{}/keepalive'.format(myhostname): str(keepalive_time), - '/nodes/{}/ipmihostname'.format(myhostname): config['ipmi_hostname'], - '/nodes/{}/ipmiusername'.format(myhostname): config['ipmi_username'], - '/nodes/{}/ipmipassword'.format(myhostname): config['ipmi_password'] - }) + ('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time)), + ('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname']), + ('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username']), + ('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password']) + ]) # Check that the primary key exists, and create it with us as master if not try: - current_primary = zkhandler.readdata(zk_conn, '/primary_node') + current_primary = zkhandler.read('/primary_node') except kazoo.exceptions.NoNodeError: current_primary = 'none' @@ -711,7 +682,9 @@ if current_primary and current_primary != 'none': else: if config['daemon_mode'] == 'coordinator': logger.out('No primary node found; creating with us as primary.', state='i') - zkhandler.writedata(zk_conn, {'/primary_node': myhostname}) + zkhandler.write([ + ('/primary_node', myhostname) + ]) ############################################################################### # PHASE 7a - Ensure IPMI is reachable and working @@ -806,8 +779,8 @@ volume_list = dict() # Dict of Lists if enable_networking: # Create an instance of the DNS Aggregator and Metadata API if we're a coordinator if config['daemon_mode'] == 'coordinator': - dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(zk_conn, config, logger) - metadata_api = MetadataAPIInstance.MetadataAPIInstance(zk_conn, config, logger) + dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(zkhandler.zk_conn, config, logger) + metadata_api = MetadataAPIInstance.MetadataAPIInstance(zkhandler.zk_conn, config, logger) else: dns_aggregator = None metadata_api = None @@ -817,14 +790,14 @@ else: # Node objects -@zk_conn.ChildrenWatch('/nodes') +@zkhandler.zk_conn.ChildrenWatch('/nodes') def update_nodes(new_node_list): global node_list, d_node # Add any missing nodes to the list for node in new_node_list: if node not in node_list: - d_node[node] = NodeInstance.NodeInstance(node, myhostname, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api) + d_node[node] = NodeInstance.NodeInstance(node, myhostname, zkhandler.zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api) # Remove any deleted nodes from the list for node in node_list: @@ -846,7 +819,7 @@ this_node = d_node[myhostname] # Maintenance mode -@zk_conn.DataWatch('/maintenance') +@zkhandler.zk_conn.DataWatch('/maintenance') def set_maintenance(_maintenance, stat, event=''): global maintenance try: @@ -856,7 +829,7 @@ def set_maintenance(_maintenance, stat, event=''): # Primary node -@zk_conn.DataWatch('/primary_node') +@zkhandler.zk_conn.DataWatch('/primary_node') def update_primary(new_primary, stat, event=''): try: new_primary = new_primary.decode('ascii') @@ -871,7 +844,7 @@ def update_primary(new_primary, stat, event=''): if this_node.daemon_state == 'run' and this_node.router_state not in ['primary', 'takeover', 'relinquish']: logger.out('Contending for primary coordinator state', state='i') # Acquire an exclusive lock on the primary_node key - primary_lock = zkhandler.exclusivelock(zk_conn, '/primary_node') + primary_lock = zkhandler.exclusivelock('/primary_node') try: # This lock times out after 0.4s, which is 0.1s less than the pre-takeover # timeout below, thus ensuring that a primary takeover will not deadlock @@ -879,23 +852,31 @@ def update_primary(new_primary, stat, event=''): primary_lock.acquire(timeout=0.4) # Ensure when we get the lock that the versions are still consistent and that # another node hasn't already acquired primary state - if key_version == zk_conn.get('/primary_node')[1].version: - zkhandler.writedata(zk_conn, {'/primary_node': myhostname}) + if key_version == zkhandler.zk_conn.get('/primary_node')[1].version: + zkhandler.write([ + ('/primary_node', myhostname) + ]) # Cleanly release the lock primary_lock.release() # We timed out acquiring a lock, which means we failed contention, so just pass - except kazoo.exceptions.LockTimeout: + except Exception: pass elif new_primary == myhostname: if this_node.router_state == 'secondary': time.sleep(0.5) - zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'takeover'}) + zkhandler.write([ + ('/nodes/{}/routerstate'.format(myhostname), 'takeover') + ]) else: if this_node.router_state == 'primary': time.sleep(0.5) - zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'relinquish'}) + zkhandler.write([ + ('/nodes/{}/routerstate'.format(myhostname), 'relinquish') + ]) else: - zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'client'}) + zkhandler.write([ + ('/nodes/{}/routerstate'.format(myhostname), 'client') + ]) for node in d_node: d_node[node].primary_node = new_primary @@ -903,14 +884,14 @@ def update_primary(new_primary, stat, event=''): if enable_networking: # Network objects - @zk_conn.ChildrenWatch('/networks') + @zkhandler.zk_conn.ChildrenWatch('/networks') def update_networks(new_network_list): global network_list, d_network # Add any missing networks to the list for network in new_network_list: if network not in network_list: - d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zk_conn, config, logger, this_node, dns_aggregator) + d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zkhandler.zk_conn, config, logger, this_node, dns_aggregator) if config['daemon_mode'] == 'coordinator' and d_network[network].nettype == 'managed': try: dns_aggregator.add_network(d_network[network]) @@ -946,20 +927,20 @@ if enable_networking: if enable_hypervisor: # VM command pipeline key - @zk_conn.DataWatch('/cmd/domains') + @zkhandler.zk_conn.DataWatch('/cmd/domains') def cmd_domains(data, stat, event=''): if data: - VMInstance.run_command(zk_conn, logger, this_node, data.decode('ascii')) + VMInstance.run_command(zkhandler.zk_conn, logger, this_node, data.decode('ascii')) # VM domain objects - @zk_conn.ChildrenWatch('/domains') + @zkhandler.zk_conn.ChildrenWatch('/domains') def update_domains(new_domain_list): global domain_list, d_domain # Add any missing domains to the list for domain in new_domain_list: if domain not in domain_list: - d_domain[domain] = VMInstance.VMInstance(domain, zk_conn, config, logger, this_node) + d_domain[domain] = VMInstance.VMInstance(domain, zkhandler.zk_conn, config, logger, this_node) # Remove any deleted domains from the list for domain in domain_list: @@ -977,20 +958,20 @@ if enable_hypervisor: if enable_storage: # Ceph command pipeline key - @zk_conn.DataWatch('/cmd/ceph') + @zkhandler.zk_conn.DataWatch('/cmd/ceph') def cmd_ceph(data, stat, event=''): if data: - CephInstance.run_command(zk_conn, logger, this_node, data.decode('ascii'), d_osd) + CephInstance.run_command(zkhandler.zk_conn, logger, this_node, data.decode('ascii'), d_osd) # OSD objects - @zk_conn.ChildrenWatch('/ceph/osds') + @zkhandler.zk_conn.ChildrenWatch('/ceph/osds') def update_osds(new_osd_list): global osd_list, d_osd # Add any missing OSDs to the list for osd in new_osd_list: if osd not in osd_list: - d_osd[osd] = CephInstance.CephOSDInstance(zk_conn, this_node, osd) + d_osd[osd] = CephInstance.CephOSDInstance(zkhandler.zk_conn, this_node, osd) # Remove any deleted OSDs from the list for osd in osd_list: @@ -1003,14 +984,14 @@ if enable_storage: logger.out('{}OSD list:{} {}'.format(fmt_blue, fmt_end, ' '.join(osd_list)), state='i') # Pool objects - @zk_conn.ChildrenWatch('/ceph/pools') + @zkhandler.zk_conn.ChildrenWatch('/ceph/pools') def update_pools(new_pool_list): global pool_list, d_pool # Add any missing Pools to the list for pool in new_pool_list: if pool not in pool_list: - d_pool[pool] = CephInstance.CephPoolInstance(zk_conn, this_node, pool) + d_pool[pool] = CephInstance.CephPoolInstance(zkhandler.zk_conn, this_node, pool) d_volume[pool] = dict() volume_list[pool] = [] @@ -1026,14 +1007,14 @@ if enable_storage: # Volume objects in each pool for pool in pool_list: - @zk_conn.ChildrenWatch('/ceph/volumes/{}'.format(pool)) + @zkhandler.zk_conn.ChildrenWatch('/ceph/volumes/{}'.format(pool)) def update_volumes(new_volume_list): global volume_list, d_volume # Add any missing Volumes to the list for volume in new_volume_list: if volume not in volume_list[pool]: - d_volume[pool][volume] = CephInstance.CephVolumeInstance(zk_conn, this_node, pool, volume) + d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler.zk_conn, this_node, pool, volume) # Remove any deleted Volumes from the list for volume in volume_list[pool]: @@ -1092,9 +1073,9 @@ def collect_ceph_stats(queue): command = {"prefix": "status", "format": "pretty"} ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: - zkhandler.writedata(zk_conn, { - '/ceph': str(ceph_status) - }) + zkhandler.write([ + ('/ceph', str(ceph_status)) + ]) except Exception as e: logger.out('Failed to set Ceph status data: {}'.format(e), state='e') return @@ -1106,9 +1087,9 @@ def collect_ceph_stats(queue): command = {"prefix": "df", "format": "pretty"} ceph_df = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: - zkhandler.writedata(zk_conn, { - '/ceph/util': str(ceph_df) - }) + zkhandler.write([ + ('/ceph/util', str(ceph_df)) + ]) except Exception as e: logger.out('Failed to set Ceph utilization data: {}'.format(e), state='e') return @@ -1171,9 +1152,9 @@ def collect_ceph_stats(queue): } # Write the pool data to Zookeeper - zkhandler.writedata(zk_conn, { - '/ceph/pools/{}/stats'.format(pool['name']): str(json.dumps(pool_df)) - }) + zkhandler.write([ + ('/ceph/pools/{}/stats'.format(pool['name']), str(json.dumps(pool_df))) + ]) except Exception as e: # One or more of the status commands timed out, just continue logger.out('Failed to format and send pool data: {}'.format(e), state='w') @@ -1307,9 +1288,9 @@ def collect_ceph_stats(queue): for osd in osd_list: try: stats = json.dumps(osd_stats[osd]) - zkhandler.writedata(zk_conn, { - '/ceph/osds/{}/stats'.format(osd): str(stats) - }) + zkhandler.write([ + ('/ceph/osds/{}/stats'.format(osd), str(stats)) + ]) except KeyError as e: # One or more of the status commands timed out, just continue logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') @@ -1374,7 +1355,9 @@ def collect_vm_stats(queue): except Exception: # Toggle a state "change" logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread') - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(domain): instance.getstate()}) + zkhandler.write([ + ('/domains/{}/state'.format(domain), instance.getstate()) + ]) elif instance.getnode() == this_node.name: memprov += instance.getmemory() @@ -1464,9 +1447,9 @@ def collect_vm_stats(queue): logger.out("Writing statistics for VM {} to Zookeeper".format(domain_name), state='d', prefix='vm-thread') try: - zkhandler.writedata(zk_conn, { - "/domains/{}/stats".format(domain_uuid): str(json.dumps(domain_stats)) - }) + zkhandler.write([ + ("/domains/{}/stats".format(domain_uuid), str(json.dumps(domain_stats))) + ]) except Exception as e: if debug: logger.out("{}".format(e), state='d', prefix='vm-thread') @@ -1492,18 +1475,22 @@ def node_keepalive(): if config['enable_networking']: if this_node.router_state == 'primary': try: - if zkhandler.readdata(zk_conn, '/upstream_ip') != config['upstream_floating_ip']: + if zkhandler.read('/upstream_ip') != config['upstream_floating_ip']: raise except Exception: - zkhandler.writedata(zk_conn, {'/upstream_ip': config['upstream_floating_ip']}) + zkhandler.write([ + ('/upstream_ip', config['upstream_floating_ip']) + ]) # Get past state and update if needed if debug: logger.out("Get past state and update if needed", state='d', prefix='main-thread') - past_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(this_node.name)) + past_state = zkhandler.read('/nodes/{}/daemonstate'.format(this_node.name)) if past_state != 'run': this_node.daemon_state = 'run' - zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(this_node.name): 'run'}) + zkhandler.write([ + ('/nodes/{}/daemonstate'.format(this_node.name), 'run') + ]) else: this_node.daemon_state = 'run' @@ -1511,8 +1498,10 @@ def node_keepalive(): if debug: logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread') if this_node.router_state == 'primary': - if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: - zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) + if zkhandler.read('/primary_node') != this_node.name: + zkhandler.write([ + ('/primary_node', this_node.name) + ]) # Run VM statistics collection in separate thread for parallelization if enable_hypervisor: @@ -1572,18 +1561,18 @@ def node_keepalive(): if debug: logger.out("Set our information in zookeeper", state='d', prefix='main-thread') try: - zkhandler.writedata(zk_conn, { - '/nodes/{}/memtotal'.format(this_node.name): str(this_node.memtotal), - '/nodes/{}/memused'.format(this_node.name): str(this_node.memused), - '/nodes/{}/memfree'.format(this_node.name): str(this_node.memfree), - '/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc), - '/nodes/{}/memprov'.format(this_node.name): str(this_node.memprov), - '/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc), - '/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload), - '/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count), - '/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list), - '/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time) - }) + zkhandler.write([ + ('/nodes/{}/memtotal'.format(this_node.name), str(this_node.memtotal)), + ('/nodes/{}/memused'.format(this_node.name), str(this_node.memused)), + ('/nodes/{}/memfree'.format(this_node.name), str(this_node.memfree)), + ('/nodes/{}/memalloc'.format(this_node.name), str(this_node.memalloc)), + ('/nodes/{}/memprov'.format(this_node.name), str(this_node.memprov)), + ('/nodes/{}/vcpualloc'.format(this_node.name), str(this_node.vcpualloc)), + ('/nodes/{}/cpuload'.format(this_node.name), str(this_node.cpuload)), + ('/nodes/{}/domainscount'.format(this_node.name), str(this_node.domains_count)), + ('/nodes/{}/runningdomains'.format(this_node.name), ' '.join(this_node.domain_list)), + ('/nodes/{}/keepalive'.format(this_node.name), str(keepalive_time)) + ]) except Exception: logger.out('Failed to set keepalive data', state='e') return @@ -1652,8 +1641,8 @@ def node_keepalive(): if config['daemon_mode'] == 'coordinator': for node_name in d_node: try: - node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) - node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name))) + node_daemon_state = zkhandler.read('/nodes/{}/daemonstate'.format(node_name)) + node_keepalive = int(zkhandler.read('/nodes/{}/keepalive'.format(node_name))) except Exception: node_daemon_state = 'unknown' node_keepalive = 0 @@ -1664,15 +1653,17 @@ def node_keepalive(): node_deadtime = int(time.time()) - (int(config['keepalive_interval']) * int(config['fence_intervals'])) if node_keepalive < node_deadtime and node_daemon_state == 'run': logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') - zk_lock = zkhandler.writelock(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) + zk_lock = zkhandler.writelock('/nodes/{}/daemonstate'.format(node_name)) with zk_lock: # Ensures that, if we lost the lock race and come out of waiting, # we won't try to trigger our own fence thread. - if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) != 'dead': - fence_thread = Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={}) + if zkhandler.read('/nodes/{}/daemonstate'.format(node_name)) != 'dead': + fence_thread = Thread(target=fencing.fenceNode, args=(node_name, zkhandler.zk_conn, config, logger), kwargs={}) fence_thread.start() # Write the updated data after we start the fence thread - zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(node_name): 'dead'}) + zkhandler.write([ + ('/nodes/{}/daemonstate'.format(node_name), 'dead') + ]) if debug: logger.out("Keepalive finished", state='d', prefix='main-thread')