Support run_os_command timeout; use timeouts

This commit is contained in:
Joshua Boniface 2019-07-09 15:03:14 -04:00
parent 83a4140703
commit 1a8e7509f7
2 changed files with 60 additions and 50 deletions

View File

@ -910,7 +910,7 @@ def update_zookeeper():
# Get Ceph cluster health (for local printing) # Get Ceph cluster health (for local printing)
if debug: if debug:
print("Get Ceph cluster health (for local printing)") print("Get Ceph cluster health (for local printing)")
retcode, stdout, stderr = common.run_os_command('ceph --connect-timeout=1 health') retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1)
ceph_health = stdout.rstrip() ceph_health = stdout.rstrip()
if 'HEALTH_OK' in ceph_health: if 'HEALTH_OK' in ceph_health:
ceph_health_colour = logger.fmt_green ceph_health_colour = logger.fmt_green
@ -924,7 +924,7 @@ def update_zookeeper():
if debug: if debug:
print("Set ceph health information in zookeeper (primary only)") print("Set ceph health information in zookeeper (primary only)")
# Get status info # Get status info
retcode, stdout, stderr = common.run_os_command('ceph --connect-timeout=1 status') retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1)
ceph_status = stdout ceph_status = stdout
try: try:
zkhandler.writedata(zk_conn, { zkhandler.writedata(zk_conn, {
@ -939,7 +939,7 @@ def update_zookeeper():
if debug: if debug:
print("Set ceph rados df information in zookeeper (primary only)") print("Set ceph rados df information in zookeeper (primary only)")
# Get rados df info # Get rados df info
retcode, stdout, stderr = common.run_os_command('rados df') retcode, stdout, stderr = common.run_os_command('rados df', timeout=1)
rados_df = stdout rados_df = stdout
try: try:
zkhandler.writedata(zk_conn, { zkhandler.writedata(zk_conn, {
@ -955,7 +955,7 @@ def update_zookeeper():
print("Set pool information in zookeeper (primary only)") print("Set pool information in zookeeper (primary only)")
# Get pool info # Get pool info
pool_df = dict() pool_df = dict()
retcode, stdout, stderr = common.run_os_command('rados df --format json') retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1)
pool_df_raw = json.loads(stdout)['pools'] pool_df_raw = json.loads(stdout)['pools']
for pool in pool_df_raw: for pool in pool_df_raw:
pool_df.update({ pool_df.update({
@ -989,7 +989,7 @@ def update_zookeeper():
print("Get data from Ceph OSDs") print("Get data from Ceph OSDs")
# Parse the dump data # Parse the dump data
osd_dump = dict() osd_dump = dict()
retcode, stdout, stderr = common.run_os_command('ceph --connect-timeout=1 osd dump --format json') retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1)
osd_dump_raw = json.loads(stdout)['osds'] osd_dump_raw = json.loads(stdout)['osds']
if debug: if debug:
print("Loop through OSD dump") print("Loop through OSD dump")
@ -1007,7 +1007,7 @@ def update_zookeeper():
if debug: if debug:
print("Parse the OSD df data") print("Parse the OSD df data")
osd_df = dict() osd_df = dict()
retcode, stdout, stderr = common.run_os_command('ceph --connect-timeout=1 osd df --format json') retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1)
try: try:
osd_df_raw = json.loads(stdout)['nodes'] osd_df_raw = json.loads(stdout)['nodes']
except: except:
@ -1030,7 +1030,7 @@ def update_zookeeper():
if debug: if debug:
print("Parse the OSD status data") print("Parse the OSD status data")
osd_status = dict() osd_status = dict()
retcode, stdout, stderr = common.run_os_command('ceph --connect-timeout=1 osd status') retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1)
if debug: if debug:
print("Loop through OSD status data") print("Loop through OSD status data")
for line in stderr.split('\n'): for line in stderr.split('\n'):
@ -1154,6 +1154,36 @@ def update_zookeeper():
# Close the Libvirt connection # Close the Libvirt connection
lv_conn.close() lv_conn.close()
# Look for dead nodes and fence them
if debug:
print("Look for dead nodes and fence them")
if config['daemon_mode'] == 'coordinator':
for node_name in d_node:
try:
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
node_domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name)))
except:
node_daemon_state = 'unknown'
node_domain_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - ( int(config['keepalive_interval']) * int(config['fence_intervals']) )
if node_keepalive < node_deadtime and node_daemon_state == 'run':
logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zk_lock = zkhandler.writelock(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread.
if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) != 'dead':
fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={})
fence_thread.start()
# Write the updated data after we start the fence thread
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })
# Display node information to the terminal # Display node information to the terminal
if config['log_keepalives']: if config['log_keepalives']:
logger.out( logger.out(
@ -1198,35 +1228,6 @@ def update_zookeeper():
), ),
) )
# Look for dead nodes and fence them
if debug:
print("Look for dead nodes and fence them")
if config['daemon_mode'] == 'coordinator':
for node_name in d_node:
try:
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
node_domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name)))
except:
node_daemon_state = 'unknown'
node_domain_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - ( int(config['keepalive_interval']) * int(config['fence_intervals']) )
if node_keepalive < node_deadtime and node_daemon_state == 'run':
logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zk_lock = zkhandler.writelock(zk_conn, '/nodes/{}/daemonstate'.format(node_name))
with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread.
if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) != 'dead':
fence_thread = threading.Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={})
fence_thread.start()
# Write the updated data after we start the fence thread
zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' })
# Start keepalive thread # Start keepalive thread
update_timer = startKeepaliveTimer() update_timer = startKeepaliveTimer()

View File

@ -61,27 +61,36 @@ def run_os_daemon(command_string, environment=None, logfile=None):
return daemon return daemon
# Run a oneshot command, optionally without blocking # Run a oneshot command, optionally without blocking
def run_os_command(command_string, background=False, environment=None): def run_os_command(command_string, background=False, environment=None, timeout=None):
command = command_string.split() command = command_string.split()
if background: if background:
def runcmd(): def runcmd():
try:
subprocess.run( subprocess.run(
command, command,
env=environment, env=environment,
timeout=timeout,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
except subprocess.TimeoutExpired:
pass
thread = threading.Thread(target=runcmd, args=()) thread = threading.Thread(target=runcmd, args=())
thread.start() thread.start()
return 0, None, None return 0, None, None
else: else:
try:
command_output = subprocess.run( command_output = subprocess.run(
command, command,
env=environment, env=environment,
timeout=timeout,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
retcode = command_output.returncode retcode = command_output.returncode
except subprocess.TimeoutExpired:
retcode = 128
try: try:
stdout = command_output.stdout.decode('ascii') stdout = command_output.stdout.decode('ascii')
except: except: