Modify node daemon root to use updated zkhandler

This commit is contained in:
Joshua Boniface 2021-05-30 14:48:41 -04:00
parent ed4f84a3ec
commit ede3e88cd7
1 changed files with 139 additions and 148 deletions

View File

@ -40,8 +40,9 @@ from queue import Queue
from xml.etree import ElementTree from xml.etree import ElementTree
from rados import Rados from rados import Rados
from daemon_lib.zkhandler import ZKHandler
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler
import pvcnoded.fencing as fencing import pvcnoded.fencing as fencing
import pvcnoded.common as common import pvcnoded.common as common
@ -517,52 +518,18 @@ time.sleep(5)
# PHASE 4 - Attempt to connect to the coordinators and start zookeeper client # PHASE 4 - Attempt to connect to the coordinators and start zookeeper client
############################################################################### ###############################################################################
# Start the connection to the coordinators # Create an instance of the handler
zk_conn = kazoo.client.KazooClient(hosts=config['coordinators']) zkhandler = ZKHandler(config, logger=logger)
try: try:
logger.out('Connecting to Zookeeper cluster nodes {}'.format(config['coordinators']), state='i') logger.out('Connecting to Zookeeper cluster nodes {}'.format(config['coordinators']), state='i')
# Start connection # Start connection
zk_conn.start() zkhandler.connect(persistent=True)
except Exception as e: except Exception as e:
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e') logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
exit(1) exit(1)
# Handle zookeeper failures
def zk_listener(state):
global zk_conn, update_timer
if state == kazoo.client.KazooState.CONNECTED:
logger.out('Connection to Zookeeper restarted', state='o')
# Start keepalive thread
if update_timer:
update_timer = startKeepaliveTimer()
else:
# Stop keepalive thread
if update_timer:
stopKeepaliveTimer()
logger.out('Connection to Zookeeper lost; retrying', state='w')
while True:
time.sleep(1)
_zk_conn = kazoo.client.KazooClient(hosts=config['coordinators'])
try:
_zk_conn.start()
except Exception:
del _zk_conn
continue
# Overwrite global zk_conn with new connection
zk_conn = _zk_conn
# Readd the listener
zk_conn.add_listener(zk_listener)
break
zk_conn.add_listener(zk_listener)
############################################################################### ###############################################################################
# PHASE 5 - Gracefully handle termination # PHASE 5 - Gracefully handle termination
############################################################################### ###############################################################################
@ -570,12 +537,14 @@ zk_conn.add_listener(zk_listener)
# Cleanup function # Cleanup function
def cleanup(): def cleanup():
global zk_conn, update_timer, d_domain global zkhandler, update_timer, d_domain
logger.out('Terminating pvcnoded and cleaning up', state='s') logger.out('Terminating pvcnoded and cleaning up', state='s')
# Set shutdown state in Zookeeper # Set shutdown state in Zookeeper
zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(myhostname): 'shutdown'}) zkhandler.write([
('/nodes/{}/daemonstate'.format(myhostname), 'shutdown')
])
# Waiting for any flushes to complete # Waiting for any flushes to complete
logger.out('Waiting for any active flushes', state='s') logger.out('Waiting for any active flushes', state='s')
@ -596,9 +565,9 @@ def cleanup():
# Force into secondary coordinator state if needed # Force into secondary coordinator state if needed
try: try:
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
zkhandler.writedata(zk_conn, { zkhandler.write([
'/primary_node': 'none' ('/primary_node', 'none')
}) ])
logger.out('Waiting for primary migration', state='s') logger.out('Waiting for primary migration', state='s')
while this_node.router_state != 'secondary': while this_node.router_state != 'secondary':
time.sleep(0.5) time.sleep(0.5)
@ -617,15 +586,17 @@ def cleanup():
node_keepalive() node_keepalive()
# Set stop state in Zookeeper # Set stop state in Zookeeper
zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(myhostname): 'stop'}) zkhandler.write([
('/nodes/{}/daemonstate'.format(myhostname), 'stop')
])
# Forcibly terminate dnsmasq because it gets stuck sometimes # Forcibly terminate dnsmasq because it gets stuck sometimes
common.run_os_command('killall dnsmasq') common.run_os_command('killall dnsmasq')
# Close the Zookeeper connection # Close the Zookeeper connection
try: try:
zk_conn.stop() zkhandler.disconnect()
zk_conn.close() del zkhandler
except Exception: except Exception:
pass pass
@ -655,54 +626,54 @@ signal.signal(signal.SIGHUP, hup)
############################################################################### ###############################################################################
# Check if our node exists in Zookeeper, and create it if not # Check if our node exists in Zookeeper, and create it if not
if zk_conn.exists('/nodes/{}'.format(myhostname)): if zkhandler.exists('/nodes/{}'.format(myhostname)):
logger.out("Node is " + fmt_green + "present" + fmt_end + " in Zookeeper", state='i') logger.out("Node is " + fmt_green + "present" + fmt_end + " in Zookeeper", state='i')
if config['daemon_mode'] == 'coordinator': if config['daemon_mode'] == 'coordinator':
init_routerstate = 'secondary' init_routerstate = 'secondary'
else: else:
init_routerstate = 'client' init_routerstate = 'client'
# Update static data just in case it's changed # Update static data just in case it's changed
zkhandler.writedata(zk_conn, { zkhandler.write([
'/nodes/{}/daemonmode'.format(myhostname): config['daemon_mode'], ('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode']),
'/nodes/{}/daemonstate'.format(myhostname): 'init', ('/nodes/{}/daemonstate'.format(myhostname), 'init'),
'/nodes/{}/routerstate'.format(myhostname): init_routerstate, ('/nodes/{}/routerstate'.format(myhostname), init_routerstate),
'/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata), ('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata)),
# Keepalives and fencing information (always load and set from config on boot) # Keepalives and fencing information (always load and set from config on boot)
'/nodes/{}/ipmihostname'.format(myhostname): config['ipmi_hostname'], ('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname']),
'/nodes/{}/ipmiusername'.format(myhostname): config['ipmi_username'], ('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username']),
'/nodes/{}/ipmipassword'.format(myhostname): config['ipmi_password'] ('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'])
}) ])
else: else:
logger.out("Node is " + fmt_red + "absent" + fmt_end + " in Zookeeper; adding new node", state='i') logger.out("Node is " + fmt_red + "absent" + fmt_end + " in Zookeeper; adding new node", state='i')
keepalive_time = int(time.time()) keepalive_time = int(time.time())
zkhandler.writedata(zk_conn, { zkhandler.write([
'/nodes/{}'.format(myhostname): config['daemon_mode'], ('/nodes/{}'.format(myhostname), config['daemon_mode']),
# Basic state information # Basic state information
'/nodes/{}/daemonmode'.format(myhostname): config['daemon_mode'], ('/nodes/{}/daemonmode'.format(myhostname), config['daemon_mode']),
'/nodes/{}/daemonstate'.format(myhostname): 'init', ('/nodes/{}/daemonstate'.format(myhostname), 'init'),
'/nodes/{}/routerstate'.format(myhostname): 'client', ('/nodes/{}/routerstate'.format(myhostname), 'client'),
'/nodes/{}/domainstate'.format(myhostname): 'flushed', ('/nodes/{}/domainstate'.format(myhostname), 'flushed'),
'/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata), ('/nodes/{}/staticdata'.format(myhostname), ' '.join(staticdata)),
'/nodes/{}/memtotal'.format(myhostname): '0', ('/nodes/{}/memtotal'.format(myhostname), '0'),
'/nodes/{}/memfree'.format(myhostname): '0', ('/nodes/{}/memfree'.format(myhostname), '0'),
'/nodes/{}/memused'.format(myhostname): '0', ('/nodes/{}/memused'.format(myhostname), '0'),
'/nodes/{}/memalloc'.format(myhostname): '0', ('/nodes/{}/memalloc'.format(myhostname), '0'),
'/nodes/{}/memprov'.format(myhostname): '0', ('/nodes/{}/memprov'.format(myhostname), '0'),
'/nodes/{}/vcpualloc'.format(myhostname): '0', ('/nodes/{}/vcpualloc'.format(myhostname), '0'),
'/nodes/{}/cpuload'.format(myhostname): '0.0', ('/nodes/{}/cpuload'.format(myhostname), '0.0'),
'/nodes/{}/networkscount'.format(myhostname): '0', ('/nodes/{}/networkscount'.format(myhostname), '0'),
'/nodes/{}/domainscount'.format(myhostname): '0', ('/nodes/{}/domainscount'.format(myhostname), '0'),
'/nodes/{}/runningdomains'.format(myhostname): '', ('/nodes/{}/runningdomains'.format(myhostname), ''),
# Keepalives and fencing information # Keepalives and fencing information
'/nodes/{}/keepalive'.format(myhostname): str(keepalive_time), ('/nodes/{}/keepalive'.format(myhostname), str(keepalive_time)),
'/nodes/{}/ipmihostname'.format(myhostname): config['ipmi_hostname'], ('/nodes/{}/ipmihostname'.format(myhostname), config['ipmi_hostname']),
'/nodes/{}/ipmiusername'.format(myhostname): config['ipmi_username'], ('/nodes/{}/ipmiusername'.format(myhostname), config['ipmi_username']),
'/nodes/{}/ipmipassword'.format(myhostname): config['ipmi_password'] ('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'])
}) ])
# Check that the primary key exists, and create it with us as master if not # Check that the primary key exists, and create it with us as master if not
try: try:
current_primary = zkhandler.readdata(zk_conn, '/primary_node') current_primary = zkhandler.read('/primary_node')
except kazoo.exceptions.NoNodeError: except kazoo.exceptions.NoNodeError:
current_primary = 'none' current_primary = 'none'
@ -711,7 +682,9 @@ if current_primary and current_primary != 'none':
else: else:
if config['daemon_mode'] == 'coordinator': if config['daemon_mode'] == 'coordinator':
logger.out('No primary node found; creating with us as primary.', state='i') logger.out('No primary node found; creating with us as primary.', state='i')
zkhandler.writedata(zk_conn, {'/primary_node': myhostname}) zkhandler.write([
('/primary_node', myhostname)
])
############################################################################### ###############################################################################
# PHASE 7a - Ensure IPMI is reachable and working # PHASE 7a - Ensure IPMI is reachable and working
@ -806,8 +779,8 @@ volume_list = dict() # Dict of Lists
if enable_networking: if enable_networking:
# Create an instance of the DNS Aggregator and Metadata API if we're a coordinator # Create an instance of the DNS Aggregator and Metadata API if we're a coordinator
if config['daemon_mode'] == 'coordinator': if config['daemon_mode'] == 'coordinator':
dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(zk_conn, config, logger) dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(zkhandler.zk_conn, config, logger)
metadata_api = MetadataAPIInstance.MetadataAPIInstance(zk_conn, config, logger) metadata_api = MetadataAPIInstance.MetadataAPIInstance(zkhandler.zk_conn, config, logger)
else: else:
dns_aggregator = None dns_aggregator = None
metadata_api = None metadata_api = None
@ -817,14 +790,14 @@ else:
# Node objects # Node objects
@zk_conn.ChildrenWatch('/nodes') @zkhandler.zk_conn.ChildrenWatch('/nodes')
def update_nodes(new_node_list): def update_nodes(new_node_list):
global node_list, d_node global node_list, d_node
# Add any missing nodes to the list # Add any missing nodes to the list
for node in new_node_list: for node in new_node_list:
if node not in node_list: if node not in node_list:
d_node[node] = NodeInstance.NodeInstance(node, myhostname, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api) d_node[node] = NodeInstance.NodeInstance(node, myhostname, zkhandler.zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api)
# Remove any deleted nodes from the list # Remove any deleted nodes from the list
for node in node_list: for node in node_list:
@ -846,7 +819,7 @@ this_node = d_node[myhostname]
# Maintenance mode # Maintenance mode
@zk_conn.DataWatch('/maintenance') @zkhandler.zk_conn.DataWatch('/maintenance')
def set_maintenance(_maintenance, stat, event=''): def set_maintenance(_maintenance, stat, event=''):
global maintenance global maintenance
try: try:
@ -856,7 +829,7 @@ def set_maintenance(_maintenance, stat, event=''):
# Primary node # Primary node
@zk_conn.DataWatch('/primary_node') @zkhandler.zk_conn.DataWatch('/primary_node')
def update_primary(new_primary, stat, event=''): def update_primary(new_primary, stat, event=''):
try: try:
new_primary = new_primary.decode('ascii') new_primary = new_primary.decode('ascii')
@ -871,7 +844,7 @@ def update_primary(new_primary, stat, event=''):
if this_node.daemon_state == 'run' and this_node.router_state not in ['primary', 'takeover', 'relinquish']: if this_node.daemon_state == 'run' and this_node.router_state not in ['primary', 'takeover', 'relinquish']:
logger.out('Contending for primary coordinator state', state='i') logger.out('Contending for primary coordinator state', state='i')
# Acquire an exclusive lock on the primary_node key # Acquire an exclusive lock on the primary_node key
primary_lock = zkhandler.exclusivelock(zk_conn, '/primary_node') primary_lock = zkhandler.exclusivelock('/primary_node')
try: try:
# This lock times out after 0.4s, which is 0.1s less than the pre-takeover # This lock times out after 0.4s, which is 0.1s less than the pre-takeover
# timeout below, thus ensuring that a primary takeover will not deadlock # timeout below, thus ensuring that a primary takeover will not deadlock
@ -879,23 +852,31 @@ def update_primary(new_primary, stat, event=''):
primary_lock.acquire(timeout=0.4) primary_lock.acquire(timeout=0.4)
# Ensure when we get the lock that the versions are still consistent and that # Ensure when we get the lock that the versions are still consistent and that
# another node hasn't already acquired primary state # another node hasn't already acquired primary state
if key_version == zk_conn.get('/primary_node')[1].version: if key_version == zkhandler.zk_conn.get('/primary_node')[1].version:
zkhandler.writedata(zk_conn, {'/primary_node': myhostname}) zkhandler.write([
('/primary_node', myhostname)
])
# Cleanly release the lock # Cleanly release the lock
primary_lock.release() primary_lock.release()
# We timed out acquiring a lock, which means we failed contention, so just pass # We timed out acquiring a lock, which means we failed contention, so just pass
except kazoo.exceptions.LockTimeout: except Exception:
pass pass
elif new_primary == myhostname: elif new_primary == myhostname:
if this_node.router_state == 'secondary': if this_node.router_state == 'secondary':
time.sleep(0.5) time.sleep(0.5)
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'takeover'}) zkhandler.write([
('/nodes/{}/routerstate'.format(myhostname), 'takeover')
])
else: else:
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
time.sleep(0.5) time.sleep(0.5)
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'relinquish'}) zkhandler.write([
('/nodes/{}/routerstate'.format(myhostname), 'relinquish')
])
else: else:
zkhandler.writedata(zk_conn, {'/nodes/{}/routerstate'.format(myhostname): 'client'}) zkhandler.write([
('/nodes/{}/routerstate'.format(myhostname), 'client')
])
for node in d_node: for node in d_node:
d_node[node].primary_node = new_primary d_node[node].primary_node = new_primary
@ -903,14 +884,14 @@ def update_primary(new_primary, stat, event=''):
if enable_networking: if enable_networking:
# Network objects # Network objects
@zk_conn.ChildrenWatch('/networks') @zkhandler.zk_conn.ChildrenWatch('/networks')
def update_networks(new_network_list): def update_networks(new_network_list):
global network_list, d_network global network_list, d_network
# Add any missing networks to the list # Add any missing networks to the list
for network in new_network_list: for network in new_network_list:
if network not in network_list: if network not in network_list:
d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zk_conn, config, logger, this_node, dns_aggregator) d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zkhandler.zk_conn, config, logger, this_node, dns_aggregator)
if config['daemon_mode'] == 'coordinator' and d_network[network].nettype == 'managed': if config['daemon_mode'] == 'coordinator' and d_network[network].nettype == 'managed':
try: try:
dns_aggregator.add_network(d_network[network]) dns_aggregator.add_network(d_network[network])
@ -946,20 +927,20 @@ if enable_networking:
if enable_hypervisor: if enable_hypervisor:
# VM command pipeline key # VM command pipeline key
@zk_conn.DataWatch('/cmd/domains') @zkhandler.zk_conn.DataWatch('/cmd/domains')
def cmd_domains(data, stat, event=''): def cmd_domains(data, stat, event=''):
if data: if data:
VMInstance.run_command(zk_conn, logger, this_node, data.decode('ascii')) VMInstance.run_command(zkhandler.zk_conn, logger, this_node, data.decode('ascii'))
# VM domain objects # VM domain objects
@zk_conn.ChildrenWatch('/domains') @zkhandler.zk_conn.ChildrenWatch('/domains')
def update_domains(new_domain_list): def update_domains(new_domain_list):
global domain_list, d_domain global domain_list, d_domain
# Add any missing domains to the list # Add any missing domains to the list
for domain in new_domain_list: for domain in new_domain_list:
if domain not in domain_list: if domain not in domain_list:
d_domain[domain] = VMInstance.VMInstance(domain, zk_conn, config, logger, this_node) d_domain[domain] = VMInstance.VMInstance(domain, zkhandler.zk_conn, config, logger, this_node)
# Remove any deleted domains from the list # Remove any deleted domains from the list
for domain in domain_list: for domain in domain_list:
@ -977,20 +958,20 @@ if enable_hypervisor:
if enable_storage: if enable_storage:
# Ceph command pipeline key # Ceph command pipeline key
@zk_conn.DataWatch('/cmd/ceph') @zkhandler.zk_conn.DataWatch('/cmd/ceph')
def cmd_ceph(data, stat, event=''): def cmd_ceph(data, stat, event=''):
if data: if data:
CephInstance.run_command(zk_conn, logger, this_node, data.decode('ascii'), d_osd) CephInstance.run_command(zkhandler.zk_conn, logger, this_node, data.decode('ascii'), d_osd)
# OSD objects # OSD objects
@zk_conn.ChildrenWatch('/ceph/osds') @zkhandler.zk_conn.ChildrenWatch('/ceph/osds')
def update_osds(new_osd_list): def update_osds(new_osd_list):
global osd_list, d_osd global osd_list, d_osd
# Add any missing OSDs to the list # Add any missing OSDs to the list
for osd in new_osd_list: for osd in new_osd_list:
if osd not in osd_list: if osd not in osd_list:
d_osd[osd] = CephInstance.CephOSDInstance(zk_conn, this_node, osd) d_osd[osd] = CephInstance.CephOSDInstance(zkhandler.zk_conn, this_node, osd)
# Remove any deleted OSDs from the list # Remove any deleted OSDs from the list
for osd in osd_list: for osd in osd_list:
@ -1003,14 +984,14 @@ if enable_storage:
logger.out('{}OSD list:{} {}'.format(fmt_blue, fmt_end, ' '.join(osd_list)), state='i') logger.out('{}OSD list:{} {}'.format(fmt_blue, fmt_end, ' '.join(osd_list)), state='i')
# Pool objects # Pool objects
@zk_conn.ChildrenWatch('/ceph/pools') @zkhandler.zk_conn.ChildrenWatch('/ceph/pools')
def update_pools(new_pool_list): def update_pools(new_pool_list):
global pool_list, d_pool global pool_list, d_pool
# Add any missing Pools to the list # Add any missing Pools to the list
for pool in new_pool_list: for pool in new_pool_list:
if pool not in pool_list: if pool not in pool_list:
d_pool[pool] = CephInstance.CephPoolInstance(zk_conn, this_node, pool) d_pool[pool] = CephInstance.CephPoolInstance(zkhandler.zk_conn, this_node, pool)
d_volume[pool] = dict() d_volume[pool] = dict()
volume_list[pool] = [] volume_list[pool] = []
@ -1026,14 +1007,14 @@ if enable_storage:
# Volume objects in each pool # Volume objects in each pool
for pool in pool_list: for pool in pool_list:
@zk_conn.ChildrenWatch('/ceph/volumes/{}'.format(pool)) @zkhandler.zk_conn.ChildrenWatch('/ceph/volumes/{}'.format(pool))
def update_volumes(new_volume_list): def update_volumes(new_volume_list):
global volume_list, d_volume global volume_list, d_volume
# Add any missing Volumes to the list # Add any missing Volumes to the list
for volume in new_volume_list: for volume in new_volume_list:
if volume not in volume_list[pool]: if volume not in volume_list[pool]:
d_volume[pool][volume] = CephInstance.CephVolumeInstance(zk_conn, this_node, pool, volume) d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler.zk_conn, this_node, pool, volume)
# Remove any deleted Volumes from the list # Remove any deleted Volumes from the list
for volume in volume_list[pool]: for volume in volume_list[pool]:
@ -1092,9 +1073,9 @@ def collect_ceph_stats(queue):
command = {"prefix": "status", "format": "pretty"} command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
try: try:
zkhandler.writedata(zk_conn, { zkhandler.write([
'/ceph': str(ceph_status) ('/ceph', str(ceph_status))
}) ])
except Exception as e: except Exception as e:
logger.out('Failed to set Ceph status data: {}'.format(e), state='e') logger.out('Failed to set Ceph status data: {}'.format(e), state='e')
return return
@ -1106,9 +1087,9 @@ def collect_ceph_stats(queue):
command = {"prefix": "df", "format": "pretty"} command = {"prefix": "df", "format": "pretty"}
ceph_df = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') ceph_df = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
try: try:
zkhandler.writedata(zk_conn, { zkhandler.write([
'/ceph/util': str(ceph_df) ('/ceph/util', str(ceph_df))
}) ])
except Exception as e: except Exception as e:
logger.out('Failed to set Ceph utilization data: {}'.format(e), state='e') logger.out('Failed to set Ceph utilization data: {}'.format(e), state='e')
return return
@ -1171,9 +1152,9 @@ def collect_ceph_stats(queue):
} }
# Write the pool data to Zookeeper # Write the pool data to Zookeeper
zkhandler.writedata(zk_conn, { zkhandler.write([
'/ceph/pools/{}/stats'.format(pool['name']): str(json.dumps(pool_df)) ('/ceph/pools/{}/stats'.format(pool['name']), str(json.dumps(pool_df)))
}) ])
except Exception as e: except Exception as e:
# One or more of the status commands timed out, just continue # One or more of the status commands timed out, just continue
logger.out('Failed to format and send pool data: {}'.format(e), state='w') logger.out('Failed to format and send pool data: {}'.format(e), state='w')
@ -1307,9 +1288,9 @@ def collect_ceph_stats(queue):
for osd in osd_list: for osd in osd_list:
try: try:
stats = json.dumps(osd_stats[osd]) stats = json.dumps(osd_stats[osd])
zkhandler.writedata(zk_conn, { zkhandler.write([
'/ceph/osds/{}/stats'.format(osd): str(stats) ('/ceph/osds/{}/stats'.format(osd), str(stats))
}) ])
except KeyError as e: except KeyError as e:
# One or more of the status commands timed out, just continue # One or more of the status commands timed out, just continue
logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w')
@ -1374,7 +1355,9 @@ def collect_vm_stats(queue):
except Exception: except Exception:
# Toggle a state "change" # Toggle a state "change"
logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread') logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread')
zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(domain): instance.getstate()}) zkhandler.write([
('/domains/{}/state'.format(domain), instance.getstate())
])
elif instance.getnode() == this_node.name: elif instance.getnode() == this_node.name:
memprov += instance.getmemory() memprov += instance.getmemory()
@ -1464,9 +1447,9 @@ def collect_vm_stats(queue):
logger.out("Writing statistics for VM {} to Zookeeper".format(domain_name), state='d', prefix='vm-thread') logger.out("Writing statistics for VM {} to Zookeeper".format(domain_name), state='d', prefix='vm-thread')
try: try:
zkhandler.writedata(zk_conn, { zkhandler.write([
"/domains/{}/stats".format(domain_uuid): str(json.dumps(domain_stats)) ("/domains/{}/stats".format(domain_uuid), str(json.dumps(domain_stats)))
}) ])
except Exception as e: except Exception as e:
if debug: if debug:
logger.out("{}".format(e), state='d', prefix='vm-thread') logger.out("{}".format(e), state='d', prefix='vm-thread')
@ -1492,18 +1475,22 @@ def node_keepalive():
if config['enable_networking']: if config['enable_networking']:
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
try: try:
if zkhandler.readdata(zk_conn, '/upstream_ip') != config['upstream_floating_ip']: if zkhandler.read('/upstream_ip') != config['upstream_floating_ip']:
raise raise
except Exception: except Exception:
zkhandler.writedata(zk_conn, {'/upstream_ip': config['upstream_floating_ip']}) zkhandler.write([
('/upstream_ip', config['upstream_floating_ip'])
])
# Get past state and update if needed # Get past state and update if needed
if debug: if debug:
logger.out("Get past state and update if needed", state='d', prefix='main-thread') logger.out("Get past state and update if needed", state='d', prefix='main-thread')
past_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(this_node.name)) past_state = zkhandler.read('/nodes/{}/daemonstate'.format(this_node.name))
if past_state != 'run': if past_state != 'run':
this_node.daemon_state = 'run' this_node.daemon_state = 'run'
zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(this_node.name): 'run'}) zkhandler.write([
('/nodes/{}/daemonstate'.format(this_node.name), 'run')
])
else: else:
this_node.daemon_state = 'run' this_node.daemon_state = 'run'
@ -1511,8 +1498,10 @@ def node_keepalive():
if debug: if debug:
logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread') logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread')
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: if zkhandler.read('/primary_node') != this_node.name:
zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) zkhandler.write([
('/primary_node', this_node.name)
])
# Run VM statistics collection in separate thread for parallelization # Run VM statistics collection in separate thread for parallelization
if enable_hypervisor: if enable_hypervisor:
@ -1572,18 +1561,18 @@ def node_keepalive():
if debug: if debug:
logger.out("Set our information in zookeeper", state='d', prefix='main-thread') logger.out("Set our information in zookeeper", state='d', prefix='main-thread')
try: try:
zkhandler.writedata(zk_conn, { zkhandler.write([
'/nodes/{}/memtotal'.format(this_node.name): str(this_node.memtotal), ('/nodes/{}/memtotal'.format(this_node.name), str(this_node.memtotal)),
'/nodes/{}/memused'.format(this_node.name): str(this_node.memused), ('/nodes/{}/memused'.format(this_node.name), str(this_node.memused)),
'/nodes/{}/memfree'.format(this_node.name): str(this_node.memfree), ('/nodes/{}/memfree'.format(this_node.name), str(this_node.memfree)),
'/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc), ('/nodes/{}/memalloc'.format(this_node.name), str(this_node.memalloc)),
'/nodes/{}/memprov'.format(this_node.name): str(this_node.memprov), ('/nodes/{}/memprov'.format(this_node.name), str(this_node.memprov)),
'/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc), ('/nodes/{}/vcpualloc'.format(this_node.name), str(this_node.vcpualloc)),
'/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload), ('/nodes/{}/cpuload'.format(this_node.name), str(this_node.cpuload)),
'/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count), ('/nodes/{}/domainscount'.format(this_node.name), str(this_node.domains_count)),
'/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list), ('/nodes/{}/runningdomains'.format(this_node.name), ' '.join(this_node.domain_list)),
'/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time) ('/nodes/{}/keepalive'.format(this_node.name), str(keepalive_time))
}) ])
except Exception: except Exception:
logger.out('Failed to set keepalive data', state='e') logger.out('Failed to set keepalive data', state='e')
return return
@ -1652,8 +1641,8 @@ def node_keepalive():
if config['daemon_mode'] == 'coordinator': if config['daemon_mode'] == 'coordinator':
for node_name in d_node: for node_name in d_node:
try: try:
node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) node_daemon_state = zkhandler.read('/nodes/{}/daemonstate'.format(node_name))
node_keepalive = int(zkhandler.readdata(zk_conn, '/nodes/{}/keepalive'.format(node_name))) node_keepalive = int(zkhandler.read('/nodes/{}/keepalive'.format(node_name)))
except Exception: except Exception:
node_daemon_state = 'unknown' node_daemon_state = 'unknown'
node_keepalive = 0 node_keepalive = 0
@ -1664,15 +1653,17 @@ def node_keepalive():
node_deadtime = int(time.time()) - (int(config['keepalive_interval']) * int(config['fence_intervals'])) node_deadtime = int(time.time()) - (int(config['keepalive_interval']) * int(config['fence_intervals']))
if node_keepalive < node_deadtime and node_daemon_state == 'run': if node_keepalive < node_deadtime and node_daemon_state == 'run':
logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zk_lock = zkhandler.writelock(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) zk_lock = zkhandler.writelock('/nodes/{}/daemonstate'.format(node_name))
with zk_lock: with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting, # Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread. # we won't try to trigger our own fence thread.
if zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) != 'dead': if zkhandler.read('/nodes/{}/daemonstate'.format(node_name)) != 'dead':
fence_thread = Thread(target=fencing.fenceNode, args=(node_name, zk_conn, config, logger), kwargs={}) fence_thread = Thread(target=fencing.fenceNode, args=(node_name, zkhandler.zk_conn, config, logger), kwargs={})
fence_thread.start() fence_thread.start()
# Write the updated data after we start the fence thread # Write the updated data after we start the fence thread
zkhandler.writedata(zk_conn, {'/nodes/{}/daemonstate'.format(node_name): 'dead'}) zkhandler.write([
('/nodes/{}/daemonstate'.format(node_name), 'dead')
])
if debug: if debug:
logger.out("Keepalive finished", state='d', prefix='main-thread') logger.out("Keepalive finished", state='d', prefix='main-thread')