diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index b23fcb51..463eaf45 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -682,8 +682,8 @@ def cmd(data, stat, event=''): node, device, weight = args.split(',') if node == this_node.name: # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Add the OSD result = CephInstance.add_osd(zk_conn, logger, node, device, weight) # Command succeeded @@ -703,8 +703,8 @@ def cmd(data, stat, event=''): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Remove the OSD result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) # Command succeeded @@ -724,8 +724,8 @@ def cmd(data, stat, event=''): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Online the OSD result = CephInstance.in_osd(zk_conn, logger, osd_id) # Command succeeded @@ -745,8 +745,8 @@ def cmd(data, stat, event=''): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Offline the OSD result = CephInstance.out_osd(zk_conn, logger, osd_id) # Command succeeded @@ -765,8 +765,8 @@ def cmd(data, stat, event=''): if this_node.router_state == 'primary': # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Set the property result = CephInstance.set_property(zk_conn, logger, option) # Command succeeded @@ -785,8 +785,8 @@ def cmd(data, stat, event=''): if this_node.router_state == 'primary': # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Unset the property result = CephInstance.unset_property(zk_conn, logger, option) # Command succeeded @@ -805,8 +805,8 @@ def cmd(data, stat, event=''): name, pgs = args.split(',') if this_node.router_state == 'primary': # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Add the pool result = CephInstance.add_pool(zk_conn, logger, name, pgs) # Command succeeded @@ -825,8 +825,8 @@ def cmd(data, stat, event=''): if this_node.router_state == 'primary': # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') - with lock: + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: # Remove the pool result = CephInstance.remove_pool(zk_conn, logger, name) # Command succeeded @@ -1088,29 +1088,33 @@ def update_zookeeper(): # Close the Libvirt connection lv_conn.close() - # Update our local node lists - flushed_node_list = [] - active_node_list = [] - inactive_node_list = [] - for node_name in d_node: - try: - node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) - node_domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node_name)) - node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name))) - except: - node_daemon_state = 'unknown' - node_domain_state = 'unknown' - node_keepalive = 0 + # Look for dead nodes and fence them + if config['daemon_mode'] == 'coordinator': + for node_name in d_node: + try: + node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) + node_domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node_name)) + node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name))) + except: + node_daemon_state = 'unknown' + node_domain_state = 'unknown' + node_keepalive = 0 - # Handle deadtime and fencng if needed - # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds - # out-of-date while in 'start' state) - node_deadtime = int(time.time()) - ( int(config['keepalive_interval']) * int(config['fence_intervals']) ) - if node_keepalive < node_deadtime and node_daemon_state == 'run': - logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') - zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) - fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={}) - fence_thread.start() + # Handle deadtime and fencng if needed + # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds + # out-of-date while in 'start' state) + node_deadtime = int(time.time()) - ( int(config['keepalive_interval']) * int(config['fence_intervals']) ) + if node_keepalive < node_deadtime and node_daemon_state == 'run': + logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') + zk_lock = zkhandler.writelock(zk_conn, '/nodes/{}/daemonstate') + with zk_lock: + # Ensures that, if we lost the lock race and come out of waiting, + # we won't try to trigger our own fence thread. + if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate') != 'dead': + fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={}) + fence_thread.start() + # Write the updated data after we start the fence thread + zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) # Display node information to the terminal logger.out(