2018-09-20 03:25:58 -04:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
# zkhandler.py - Secure versioned ZooKeeper updates
|
|
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
|
|
#
|
2021-03-25 17:01:55 -04:00
|
|
|
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
|
2018-09-20 03:25:58 -04:00
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
2021-03-25 16:57:17 -04:00
|
|
|
# the Free Software Foundation, version 3.
|
2018-09-20 03:25:58 -04:00
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
#
|
|
|
|
###############################################################################
|
|
|
|
|
2021-06-08 21:33:26 -04:00
|
|
|
import os
|
2020-11-07 13:38:54 -05:00
|
|
|
import time
|
2018-10-30 22:41:44 -04:00
|
|
|
import uuid
|
2021-06-08 21:33:26 -04:00
|
|
|
import json
|
2021-06-12 18:22:43 -04:00
|
|
|
import re
|
2021-05-28 00:19:39 -04:00
|
|
|
from functools import wraps
|
2021-05-30 14:47:39 -04:00
|
|
|
from kazoo.client import KazooClient, KazooState
|
2021-06-08 21:33:26 -04:00
|
|
|
from kazoo.exceptions import NoNodeError
|
2021-05-27 22:48:48 -04:00
|
|
|
|
|
|
|
|
2021-05-28 00:19:39 -04:00
|
|
|
#
|
|
|
|
# Function decorators
|
|
|
|
#
|
|
|
|
class ZKConnection(object):
|
|
|
|
"""
|
|
|
|
Decorates a function with a Zookeeper connection before and after the main call.
|
|
|
|
|
|
|
|
The decorated function must accept the `zkhandler` argument as its first argument, and
|
|
|
|
then use this to access the connection.
|
|
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
def __call__(self, function):
|
|
|
|
if not callable(function):
|
|
|
|
return
|
|
|
|
|
|
|
|
@wraps(function)
|
|
|
|
def connection(*args, **kwargs):
|
|
|
|
zkhandler = ZKHandler(self.config)
|
|
|
|
zkhandler.connect()
|
2021-06-14 21:13:40 -04:00
|
|
|
schema_version = zkhandler.read('base.schema.version')
|
|
|
|
if schema_version is None:
|
|
|
|
schema_version = 0
|
|
|
|
zkhandler.schema.load(schema_version, quiet=True)
|
2021-05-28 00:19:39 -04:00
|
|
|
|
|
|
|
ret = function(zkhandler, *args, **kwargs)
|
|
|
|
|
|
|
|
zkhandler.disconnect()
|
|
|
|
del zkhandler
|
|
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
return connection
|
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
# Exceptions
|
|
|
|
#
|
|
|
|
class ZKConnectionException(Exception):
|
|
|
|
"""
|
|
|
|
A exception when connecting to the cluster
|
|
|
|
"""
|
|
|
|
def __init__(self, zkhandler, error=None):
|
|
|
|
if error is not None:
|
|
|
|
self.message = "Failed to connect to Zookeeper at {}: {}".format(zkhandler.coordinators(), error)
|
|
|
|
else:
|
|
|
|
self.message = "Failed to connect to Zookeeper at {}".format(zkhandler.coordinators())
|
|
|
|
zkhandler.disconnect()
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return str(self.message)
|
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
# Handler class
|
|
|
|
#
|
2021-05-27 22:48:48 -04:00
|
|
|
class ZKHandler(object):
|
2021-05-30 14:47:39 -04:00
|
|
|
def __init__(self, config, logger=None):
|
2021-05-27 22:48:48 -04:00
|
|
|
"""
|
|
|
|
Initialize an instance of the ZKHandler class with config
|
|
|
|
|
|
|
|
A zk_conn object will be created but not started
|
2021-06-09 13:23:57 -04:00
|
|
|
|
|
|
|
A ZKSchema instance will be created
|
2021-05-27 22:48:48 -04:00
|
|
|
"""
|
|
|
|
self.encoding = 'utf8'
|
2021-05-28 00:19:39 -04:00
|
|
|
self.coordinators = config['coordinators']
|
2021-05-30 14:47:39 -04:00
|
|
|
self.logger = logger
|
2021-05-28 00:19:39 -04:00
|
|
|
self.zk_conn = KazooClient(hosts=self.coordinators)
|
2021-06-09 13:23:57 -04:00
|
|
|
self._schema = ZKSchema()
|
2021-05-28 00:19:39 -04:00
|
|
|
|
|
|
|
#
|
|
|
|
# Class meta-functions
|
|
|
|
#
|
|
|
|
def coordinators(self):
|
|
|
|
return str(self.coordinators)
|
2021-05-27 22:48:48 -04:00
|
|
|
|
2021-05-30 14:47:39 -04:00
|
|
|
def log(self, message, state=''):
|
|
|
|
if self.logger is not None:
|
|
|
|
self.logger.out(message, state)
|
|
|
|
else:
|
|
|
|
print(message)
|
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
#
|
|
|
|
# Properties
|
|
|
|
#
|
|
|
|
@property
|
|
|
|
def schema(self):
|
|
|
|
return self._schema
|
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
#
|
|
|
|
# State/connection management
|
|
|
|
#
|
2021-05-30 14:47:39 -04:00
|
|
|
def listener(self, state):
|
2021-07-07 12:28:08 -04:00
|
|
|
"""
|
|
|
|
Listen for KazooState changes and log accordingly.
|
|
|
|
|
|
|
|
This function does not do anything except for log the state, and Kazoo handles the rest.
|
|
|
|
"""
|
2021-05-30 14:47:39 -04:00
|
|
|
if state == KazooState.CONNECTED:
|
2021-07-07 11:50:14 -04:00
|
|
|
self.log('Connection to Zookeeper resumed', state='o')
|
2021-05-30 14:47:39 -04:00
|
|
|
else:
|
2021-07-07 11:50:14 -04:00
|
|
|
self.log('Connection to Zookeeper lost with state {}'.format(state), state='w')
|
2021-05-30 14:47:39 -04:00
|
|
|
|
|
|
|
def connect(self, persistent=False):
|
2021-05-27 22:48:48 -04:00
|
|
|
"""
|
2021-07-07 12:28:08 -04:00
|
|
|
Start the zk_conn object and connect to the cluster
|
2021-05-27 22:48:48 -04:00
|
|
|
"""
|
2021-05-28 00:19:39 -04:00
|
|
|
try:
|
|
|
|
self.zk_conn.start()
|
2021-07-07 11:50:14 -04:00
|
|
|
self.log('Connection to Zookeeper started', state='o')
|
2021-05-30 14:47:39 -04:00
|
|
|
if persistent:
|
|
|
|
self.zk_conn.add_listener(self.listener)
|
2021-05-28 00:19:39 -04:00
|
|
|
except Exception as e:
|
|
|
|
raise ZKConnectionException(self, e)
|
2021-05-27 22:48:48 -04:00
|
|
|
|
|
|
|
def disconnect(self):
|
|
|
|
"""
|
|
|
|
Stop and close the zk_conn object and disconnect from the cluster
|
|
|
|
|
|
|
|
The class instance may be reused later (avoids persistent connections)
|
|
|
|
"""
|
|
|
|
self.zk_conn.stop()
|
|
|
|
self.zk_conn.close()
|
2021-07-07 11:50:14 -04:00
|
|
|
self.log('Connection to Zookeeper terminated', state='o')
|
2021-05-27 22:48:48 -04:00
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
#
|
|
|
|
# Schema helper actions
|
|
|
|
#
|
|
|
|
def get_schema_path(self, key):
|
2021-07-07 12:28:08 -04:00
|
|
|
"""
|
|
|
|
Get the Zookeeper path for {key} from the current schema based on its format.
|
|
|
|
|
|
|
|
If {key} is a tuple of length 2, it's treated as a path plus an item instance of that path (e.g. a node, a VM, etc.).
|
|
|
|
|
|
|
|
If {key} is a tuple of length 4, it is treated as a path plus an item instance, as well as another item instance of the subpath.
|
|
|
|
|
|
|
|
If {key} is just a string, it's treated as a lone path (mostly used for the 'base' schema group.
|
|
|
|
|
|
|
|
Otherwise, returns None since this is not a valid key.
|
|
|
|
|
|
|
|
This function also handles the special case where a string that looks like an existing path (i.e. starts with '/') is passed;
|
|
|
|
in that case it will silently return the same path back. This was mostly a migration functionality and is deprecated.
|
|
|
|
"""
|
2021-06-09 13:23:57 -04:00
|
|
|
if isinstance(key, tuple):
|
|
|
|
# This is a key tuple with both an ipath and an item
|
2021-06-09 23:52:21 -04:00
|
|
|
if len(key) == 2:
|
|
|
|
# 2-length normal tuple
|
|
|
|
ipath, item = key
|
|
|
|
elif len(key) == 4:
|
|
|
|
# 4-length sub-level tuple
|
|
|
|
ipath, item, sub_ipath, sub_item = key
|
|
|
|
return self.schema.path(ipath, item=item) + self.schema.path(sub_ipath, item=sub_item)
|
|
|
|
else:
|
|
|
|
# This is an invalid key
|
|
|
|
return None
|
2021-06-09 13:23:57 -04:00
|
|
|
elif isinstance(key, str):
|
|
|
|
# This is a key string with just an ipath
|
|
|
|
ipath = key
|
|
|
|
item = None
|
2021-06-12 18:22:43 -04:00
|
|
|
|
2021-06-13 14:26:17 -04:00
|
|
|
# This is a raw key path, used by backup/restore functionality
|
2021-06-12 18:22:43 -04:00
|
|
|
if re.match(r'^/', ipath):
|
|
|
|
return ipath
|
2021-06-09 13:23:57 -04:00
|
|
|
else:
|
|
|
|
# This is an invalid key
|
|
|
|
return None
|
|
|
|
|
|
|
|
return self.schema.path(ipath, item=item)
|
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
#
|
|
|
|
# Key Actions
|
|
|
|
#
|
|
|
|
def exists(self, key):
|
|
|
|
"""
|
|
|
|
Check if a key exists
|
|
|
|
"""
|
2021-06-09 13:23:57 -04:00
|
|
|
path = self.get_schema_path(key)
|
2021-07-05 23:29:24 -04:00
|
|
|
if path is None:
|
|
|
|
# This path is invalid, this is likely due to missing schema entries, so return False
|
|
|
|
return False
|
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
stat = self.zk_conn.exists(path)
|
2021-05-27 22:48:48 -04:00
|
|
|
if stat:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def read(self, key):
|
|
|
|
"""
|
|
|
|
Read data from a key
|
|
|
|
"""
|
2021-07-01 01:15:51 -04:00
|
|
|
try:
|
2021-06-13 15:39:43 -04:00
|
|
|
path = self.get_schema_path(key)
|
2021-07-05 23:29:24 -04:00
|
|
|
if path is None:
|
|
|
|
# This path is invalid; this is likely due to missing schema entries, so return None
|
|
|
|
return None
|
2021-06-13 15:39:43 -04:00
|
|
|
|
2021-07-05 23:29:24 -04:00
|
|
|
return self.zk_conn.get(path)[0].decode(self.encoding)
|
|
|
|
except NoNodeError:
|
|
|
|
return None
|
2021-05-27 22:48:48 -04:00
|
|
|
|
|
|
|
def write(self, kvpairs):
|
|
|
|
"""
|
|
|
|
Create or update one or more keys' data
|
|
|
|
"""
|
|
|
|
if type(kvpairs) is not list:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler error: Key-value sequence is not a list", state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
|
|
|
|
|
|
|
transaction = self.zk_conn.transaction()
|
|
|
|
|
|
|
|
for kvpair in (kvpairs):
|
|
|
|
if type(kvpair) is not tuple:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler error: Key-value pair '{}' is not a tuple".format(kvpair), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
2018-10-30 22:41:44 -04:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
key = kvpair[0]
|
|
|
|
value = kvpair[1]
|
2020-11-07 14:45:24 -05:00
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
path = self.get_schema_path(key)
|
2021-07-05 23:29:24 -04:00
|
|
|
if path is None:
|
|
|
|
# This path is invalid; this is likely due to missing schema entries, so continue
|
|
|
|
continue
|
2021-06-09 13:23:57 -04:00
|
|
|
|
2021-06-10 00:49:01 -04:00
|
|
|
if not self.exists(key):
|
2021-05-27 22:48:48 -04:00
|
|
|
# Creating a new key
|
2021-06-09 13:23:57 -04:00
|
|
|
transaction.create(path, str(value).encode(self.encoding))
|
2020-11-07 14:45:24 -05:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
else:
|
|
|
|
# Updating an existing key
|
2021-06-09 13:23:57 -04:00
|
|
|
data = self.zk_conn.get(path)
|
2021-05-27 22:48:48 -04:00
|
|
|
version = data[1].version
|
2018-09-20 03:25:58 -04:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
# Validate the expected version after the execution
|
|
|
|
new_version = version + 1
|
2020-11-07 14:45:24 -05:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
# Update the data
|
2021-06-09 13:23:57 -04:00
|
|
|
transaction.set_data(path, str(value).encode(self.encoding))
|
2018-09-28 16:14:31 -04:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
# Check the data
|
|
|
|
try:
|
2021-06-09 13:23:57 -04:00
|
|
|
transaction.check(path, new_version)
|
2021-05-27 22:48:48 -04:00
|
|
|
except TypeError:
|
2021-06-09 13:23:57 -04:00
|
|
|
self.log("ZKHandler error: Key '{}' does not match expected version".format(path), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
2018-09-28 19:34:35 -04:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
try:
|
|
|
|
transaction.commit()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler error: Failed to commit transaction: {}".format(e), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
|
|
|
|
2021-05-31 00:13:13 -04:00
|
|
|
def delete(self, keys, recursive=True):
|
2021-05-27 22:48:48 -04:00
|
|
|
"""
|
2021-05-31 00:13:13 -04:00
|
|
|
Delete a key or list of keys (defaults to recursive)
|
2021-05-27 22:48:48 -04:00
|
|
|
"""
|
2021-05-31 00:13:13 -04:00
|
|
|
if type(keys) is not list:
|
|
|
|
keys = [keys]
|
|
|
|
|
|
|
|
for key in keys:
|
2021-06-10 00:49:01 -04:00
|
|
|
if self.exists(key):
|
2021-05-31 19:22:01 -04:00
|
|
|
try:
|
2021-07-05 23:29:24 -04:00
|
|
|
path = self.get_schema_path(key)
|
2021-06-09 13:23:57 -04:00
|
|
|
self.zk_conn.delete(path, recursive=recursive)
|
2021-05-31 19:22:01 -04:00
|
|
|
except Exception as e:
|
2021-06-09 13:23:57 -04:00
|
|
|
self.log("ZKHandler error: Failed to delete key {}: {}".format(path, e), state='e')
|
2021-05-31 19:22:01 -04:00
|
|
|
return False
|
2021-05-31 00:13:13 -04:00
|
|
|
|
2021-05-31 19:22:01 -04:00
|
|
|
return True
|
2021-05-27 22:48:48 -04:00
|
|
|
|
|
|
|
def children(self, key):
|
|
|
|
"""
|
|
|
|
Lists all children of a key
|
|
|
|
"""
|
2021-07-01 01:15:51 -04:00
|
|
|
try:
|
|
|
|
path = self.get_schema_path(key)
|
2021-07-05 23:29:24 -04:00
|
|
|
if path is None:
|
|
|
|
# This path is invalid; this is likely due to missing schema entries, so return None
|
|
|
|
return None
|
2021-07-01 01:15:51 -04:00
|
|
|
|
2021-07-05 23:29:24 -04:00
|
|
|
return self.zk_conn.get_children(path)
|
|
|
|
except NoNodeError:
|
|
|
|
return None
|
2021-05-27 22:48:48 -04:00
|
|
|
|
|
|
|
def rename(self, kkpairs):
|
|
|
|
"""
|
|
|
|
Rename one or more keys to a new value
|
|
|
|
"""
|
|
|
|
if type(kkpairs) is not list:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler error: Key-key sequence is not a list", state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
|
|
|
|
|
|
|
transaction = self.zk_conn.transaction()
|
|
|
|
|
2021-06-14 00:51:45 -04:00
|
|
|
def rename_element(transaction, source_path, destination_path):
|
2021-06-09 13:23:57 -04:00
|
|
|
data = self.zk_conn.get(source_path)[0]
|
|
|
|
transaction.create(destination_path, data)
|
2021-05-27 22:48:48 -04:00
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
if self.children(source_path):
|
|
|
|
for child_path in self.children(source_path):
|
|
|
|
child_source_path = "{}/{}".format(source_path, child_path)
|
|
|
|
child_destination_path = "{}/{}".format(destination_path, child_path)
|
|
|
|
rename_element(transaction, child_source_path, child_destination_path)
|
2021-05-27 22:48:48 -04:00
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
transaction.delete(source_path)
|
2021-05-27 22:48:48 -04:00
|
|
|
|
|
|
|
for kkpair in (kkpairs):
|
|
|
|
if type(kkpair) is not tuple:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler error: Key-key pair '{}' is not a tuple".format(kkpair), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
2018-09-28 16:14:31 -04:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
source_key = kkpair[0]
|
2021-06-09 13:23:57 -04:00
|
|
|
source_path = self.get_schema_path(source_key)
|
2021-07-05 23:29:24 -04:00
|
|
|
if source_path is None:
|
|
|
|
# This path is invalid; this is likely due to missing schema entries, so continue
|
|
|
|
continue
|
2021-06-09 13:23:57 -04:00
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
destination_key = kkpair[1]
|
2021-06-09 13:23:57 -04:00
|
|
|
destination_path = self.get_schema_path(destination_key)
|
2021-07-05 23:29:24 -04:00
|
|
|
if destination_path is None:
|
|
|
|
# This path is invalid; this is likely due to missing schema entries, so continue
|
|
|
|
continue
|
2018-09-28 16:14:31 -04:00
|
|
|
|
2021-06-10 00:49:01 -04:00
|
|
|
if not self.exists(source_key):
|
2021-06-09 13:23:57 -04:00
|
|
|
self.log("ZKHander error: Source key '{}' does not exist".format(source_path), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
2021-07-05 23:29:24 -04:00
|
|
|
|
2021-06-10 00:49:01 -04:00
|
|
|
if self.exists(destination_key):
|
2021-06-09 13:23:57 -04:00
|
|
|
self.log("ZKHander error: Destination key '{}' already exists".format(destination_path), state='e')
|
2018-09-28 16:14:31 -04:00
|
|
|
return False
|
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
rename_element(transaction, source_path, destination_path)
|
2020-11-07 14:45:24 -05:00
|
|
|
|
2020-10-21 11:17:06 -04:00
|
|
|
try:
|
2021-05-27 22:48:48 -04:00
|
|
|
transaction.commit()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler error: Failed to commit transaction: {}".format(e), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
return False
|
|
|
|
|
|
|
|
#
|
|
|
|
# Lock actions
|
|
|
|
#
|
|
|
|
def readlock(self, key):
|
|
|
|
"""
|
|
|
|
Acquires a read lock on a key
|
|
|
|
"""
|
|
|
|
count = 1
|
|
|
|
lock = None
|
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
path = self.get_schema_path(key)
|
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
lock_id = str(uuid.uuid1())
|
2021-06-09 13:23:57 -04:00
|
|
|
lock = self.zk_conn.ReadLock(path, lock_id)
|
2020-10-21 11:17:06 -04:00
|
|
|
break
|
2021-07-01 01:17:38 -04:00
|
|
|
except NoNodeError:
|
|
|
|
self.log("ZKHandler warning: Failed to acquire read lock on nonexistent path {}".format(path), state='e')
|
|
|
|
return None
|
2021-05-27 22:48:48 -04:00
|
|
|
except Exception as e:
|
|
|
|
if count > 5:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler warning: Failed to acquire read lock after 5 tries: {}".format(e), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
time.sleep(0.5)
|
|
|
|
count += 1
|
|
|
|
continue
|
|
|
|
|
|
|
|
return lock
|
|
|
|
|
|
|
|
def writelock(self, key):
|
|
|
|
"""
|
|
|
|
Acquires a write lock on a key
|
|
|
|
"""
|
|
|
|
count = 1
|
|
|
|
lock = None
|
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
path = self.get_schema_path(key)
|
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
lock_id = str(uuid.uuid1())
|
2021-06-09 13:23:57 -04:00
|
|
|
lock = self.zk_conn.WriteLock(path, lock_id)
|
2020-10-21 11:17:06 -04:00
|
|
|
break
|
2021-07-01 01:17:38 -04:00
|
|
|
except NoNodeError:
|
|
|
|
self.log("ZKHandler warning: Failed to acquire write lock on nonexistent path {}".format(path), state='e')
|
|
|
|
return None
|
2021-05-27 22:48:48 -04:00
|
|
|
except Exception as e:
|
|
|
|
if count > 5:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler warning: Failed to acquire write lock after 5 tries: {}".format(e), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
time.sleep(0.5)
|
|
|
|
count += 1
|
|
|
|
continue
|
|
|
|
|
|
|
|
return lock
|
|
|
|
|
|
|
|
def exclusivelock(self, key):
|
|
|
|
"""
|
|
|
|
Acquires an exclusive lock on a key
|
|
|
|
"""
|
|
|
|
count = 1
|
|
|
|
lock = None
|
|
|
|
|
2021-06-09 13:23:57 -04:00
|
|
|
path = self.get_schema_path(key)
|
|
|
|
|
2021-05-27 22:48:48 -04:00
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
lock_id = str(uuid.uuid1())
|
2021-06-09 13:23:57 -04:00
|
|
|
lock = self.zk_conn.Lock(path, lock_id)
|
2020-10-21 10:46:41 -04:00
|
|
|
break
|
2021-07-01 01:17:38 -04:00
|
|
|
except NoNodeError:
|
|
|
|
self.log("ZKHandler warning: Failed to acquire exclusive lock on nonexistent path {}".format(path), state='e')
|
|
|
|
return None
|
2021-05-27 22:48:48 -04:00
|
|
|
except Exception as e:
|
|
|
|
if count > 5:
|
2021-05-30 14:47:39 -04:00
|
|
|
self.log("ZKHandler warning: Failed to acquire exclusive lock after 5 tries: {}".format(e), state='e')
|
2021-05-27 22:48:48 -04:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
time.sleep(0.5)
|
|
|
|
count += 1
|
|
|
|
continue
|
|
|
|
|
|
|
|
return lock
|
2021-06-08 21:33:26 -04:00
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
# Schema classes
|
|
|
|
#
|
|
|
|
class ZKSchema(object):
|
|
|
|
# Current version
|
2021-07-05 09:57:38 -04:00
|
|
|
_version = 2
|
2021-06-08 21:33:26 -04:00
|
|
|
|
|
|
|
# Root for doing nested keys
|
|
|
|
_schema_root = ''
|
|
|
|
|
|
|
|
# Primary schema definition for the current version
|
|
|
|
_schema = {
|
|
|
|
'version': f'{_version}',
|
|
|
|
'root': f'{_schema_root}',
|
|
|
|
# Base schema defining core keys; this is all that is initialized on cluster init()
|
|
|
|
'base': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'root': f'{_schema_root}',
|
2021-06-08 21:33:26 -04:00
|
|
|
'schema': f'{_schema_root}/schema',
|
|
|
|
'schema.version': f'{_schema_root}/schema/version',
|
|
|
|
'config': f'{_schema_root}/config',
|
|
|
|
'config.maintenance': f'{_schema_root}/config/maintenance',
|
|
|
|
'config.primary_node': f'{_schema_root}/config/primary_node',
|
2021-06-09 22:49:58 -04:00
|
|
|
'config.primary_node.sync_lock': f'{_schema_root}/config/primary_node/sync_lock',
|
2021-06-08 21:33:26 -04:00
|
|
|
'config.upstream_ip': f'{_schema_root}/config/upstream_ip',
|
|
|
|
'config.migration_target_selector': f'{_schema_root}/config/migration_target_selector',
|
|
|
|
'cmd': f'{_schema_root}/cmd',
|
2021-06-09 09:49:46 -04:00
|
|
|
'cmd.node': f'{_schema_root}/cmd/nodes',
|
|
|
|
'cmd.domain': f'{_schema_root}/cmd/domains',
|
2021-06-08 21:33:26 -04:00
|
|
|
'cmd.ceph': f'{_schema_root}/cmd/ceph',
|
|
|
|
'node': f'{_schema_root}/nodes',
|
|
|
|
'domain': f'{_schema_root}/domains',
|
|
|
|
'network': f'{_schema_root}/networks',
|
|
|
|
'storage': f'{_schema_root}/ceph',
|
|
|
|
'storage.util': f'{_schema_root}/ceph/util',
|
|
|
|
'osd': f'{_schema_root}/ceph/osds',
|
|
|
|
'pool': f'{_schema_root}/ceph/pools',
|
|
|
|
'volume': f'{_schema_root}/ceph/volumes',
|
|
|
|
'snapshot': f'{_schema_root}/ceph/snapshots',
|
|
|
|
},
|
|
|
|
# The schema of an individual node entry (/nodes/{node_name})
|
|
|
|
'node': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'name': '', # The root key
|
2021-06-08 21:33:26 -04:00
|
|
|
'keepalive': '/keepalive',
|
|
|
|
'mode': '/daemonmode',
|
2021-06-08 22:31:22 -04:00
|
|
|
'data.active_schema': '/activeschema',
|
2021-06-08 23:17:07 -04:00
|
|
|
'data.latest_schema': '/latestschema',
|
2021-06-08 22:19:02 -04:00
|
|
|
'data.static': '/staticdata',
|
2021-07-05 09:57:38 -04:00
|
|
|
'data.pvc_version': '/pvcversion',
|
2021-06-09 09:49:46 -04:00
|
|
|
'running_domains': '/runningdomains',
|
|
|
|
'count.provisioned_domains': '/domainscount',
|
|
|
|
'count.networks': '/networkscount',
|
2021-06-08 21:33:26 -04:00
|
|
|
'state.daemon': '/daemonstate',
|
|
|
|
'state.router': '/routerstate',
|
|
|
|
'state.domain': '/domainstate',
|
2021-06-09 09:49:46 -04:00
|
|
|
'cpu.load': '/cpuload',
|
2021-06-08 21:33:26 -04:00
|
|
|
'vcpu.allocated': '/vcpualloc',
|
|
|
|
'memory.total': '/memtotal',
|
|
|
|
'memory.used': '/memused',
|
|
|
|
'memory.free': '/memfree',
|
|
|
|
'memory.allocated': '/memalloc',
|
|
|
|
'memory.provisioned': '/memprov',
|
|
|
|
'ipmi.hostname': '/ipmihostname',
|
|
|
|
'ipmi.username': '/ipmiusername',
|
Implement SR-IOV PF and VF instances
Adds support for the node daemon managing SR-IOV PF and VF instances.
PFs are added to Zookeeper automatically based on the config at startup
during network configuration, and are otherwise completely static. PFs
are automatically removed from Zookeeper, along with all coresponding
VFs, should the PF phy device be removed from the configuration.
VFs are configured based on the (autocreated) VFs of each PF device,
added to Zookeeper, and then a new class instance, SRIOVVFInstance, is
used to watch them for configuration changes. This will enable the
runtime management of VF settings by the API. The set of keys ensures
that both configuration and details of the NIC can be tracked.
Most keys are self-explanatory, especially for PFs and the basic keys
for VFs. The configuration tree is also self-explanatory, being based
entirely on the options available in the `ip link set {dev} vf` command.
Two additional keys are also present: `used` and `used_by`, which will
be able to track the (boolean) state of usage, as well as the VM that
uses a given VIF. Since the VM side implementation will support both
macvtap and direct "hostdev" assignments, this will ensure that this
state can be tracked on both the VF and the VM side.
2021-06-17 01:01:23 -04:00
|
|
|
'ipmi.password': '/ipmipassword',
|
|
|
|
'sriov': '/sriov',
|
|
|
|
'sriov.pf': '/sriov/pf',
|
|
|
|
'sriov.vf': '/sriov/vf',
|
|
|
|
},
|
|
|
|
# The schema of an individual SR-IOV PF entry (/nodes/{node_name}/sriov/pf/{pf})
|
|
|
|
'sriov_pf': {
|
|
|
|
'phy': '', # The root key
|
|
|
|
'mtu': '/mtu',
|
|
|
|
'vfcount': '/vfcount'
|
|
|
|
},
|
|
|
|
# The schema of an individual SR-IOV VF entry (/nodes/{node_name}/sriov/vf/{vf})
|
|
|
|
'sriov_vf': {
|
|
|
|
'phy': '', # The root key
|
|
|
|
'pf': '/pf',
|
|
|
|
'mtu': '/mtu',
|
|
|
|
'mac': '/mac',
|
2021-06-22 00:54:32 -04:00
|
|
|
'phy_mac': '/phy_mac',
|
Implement SR-IOV PF and VF instances
Adds support for the node daemon managing SR-IOV PF and VF instances.
PFs are added to Zookeeper automatically based on the config at startup
during network configuration, and are otherwise completely static. PFs
are automatically removed from Zookeeper, along with all coresponding
VFs, should the PF phy device be removed from the configuration.
VFs are configured based on the (autocreated) VFs of each PF device,
added to Zookeeper, and then a new class instance, SRIOVVFInstance, is
used to watch them for configuration changes. This will enable the
runtime management of VF settings by the API. The set of keys ensures
that both configuration and details of the NIC can be tracked.
Most keys are self-explanatory, especially for PFs and the basic keys
for VFs. The configuration tree is also self-explanatory, being based
entirely on the options available in the `ip link set {dev} vf` command.
Two additional keys are also present: `used` and `used_by`, which will
be able to track the (boolean) state of usage, as well as the VM that
uses a given VIF. Since the VM side implementation will support both
macvtap and direct "hostdev" assignments, this will ensure that this
state can be tracked on both the VF and the VM side.
2021-06-17 01:01:23 -04:00
|
|
|
'config': '/config',
|
|
|
|
'config.vlan_id': '/config/vlan_id',
|
|
|
|
'config.vlan_qos': '/config/vlan_qos',
|
|
|
|
'config.tx_rate_min': '/config/tx_rate_min',
|
|
|
|
'config.tx_rate_max': '/config/tx_rate_max',
|
|
|
|
'config.spoof_check': '/config/spoof_check',
|
|
|
|
'config.link_state': '/config/link_state',
|
|
|
|
'config.trust': '/config/trust',
|
|
|
|
'config.query_rss': '/config/query_rss',
|
2021-06-21 20:49:45 -04:00
|
|
|
'pci': '/pci',
|
|
|
|
'pci.domain': '/pci/domain',
|
|
|
|
'pci.bus': '/pci/bus',
|
|
|
|
'pci.slot': '/pci/slot',
|
|
|
|
'pci.function': '/pci/function',
|
Implement SR-IOV PF and VF instances
Adds support for the node daemon managing SR-IOV PF and VF instances.
PFs are added to Zookeeper automatically based on the config at startup
during network configuration, and are otherwise completely static. PFs
are automatically removed from Zookeeper, along with all coresponding
VFs, should the PF phy device be removed from the configuration.
VFs are configured based on the (autocreated) VFs of each PF device,
added to Zookeeper, and then a new class instance, SRIOVVFInstance, is
used to watch them for configuration changes. This will enable the
runtime management of VF settings by the API. The set of keys ensures
that both configuration and details of the NIC can be tracked.
Most keys are self-explanatory, especially for PFs and the basic keys
for VFs. The configuration tree is also self-explanatory, being based
entirely on the options available in the `ip link set {dev} vf` command.
Two additional keys are also present: `used` and `used_by`, which will
be able to track the (boolean) state of usage, as well as the VM that
uses a given VIF. Since the VM side implementation will support both
macvtap and direct "hostdev" assignments, this will ensure that this
state can be tracked on both the VF and the VM side.
2021-06-17 01:01:23 -04:00
|
|
|
'used': '/used',
|
|
|
|
'used_by': '/used_by'
|
2021-06-08 21:33:26 -04:00
|
|
|
},
|
|
|
|
# The schema of an individual domain entry (/domains/{domain_uuid})
|
|
|
|
'domain': {
|
|
|
|
'name': '', # The root key
|
|
|
|
'xml': '/xml',
|
|
|
|
'state': '/state',
|
|
|
|
'profile': '/profile',
|
|
|
|
'stats': '/stats',
|
|
|
|
'node': '/node',
|
|
|
|
'last_node': '/lastnode',
|
|
|
|
'failed_reason': '/failedreason',
|
2021-06-09 22:12:24 -04:00
|
|
|
'storage.volumes': '/rbdlist',
|
2021-06-08 21:33:26 -04:00
|
|
|
'console.log': '/consolelog',
|
|
|
|
'console.vnc': '/vnc',
|
|
|
|
'meta.autostart': '/node_autostart',
|
|
|
|
'meta.migrate_method': '/migration_method',
|
|
|
|
'meta.node_selector': '/node_selector',
|
2021-06-09 22:49:58 -04:00
|
|
|
'meta.node_limit': '/node_limit',
|
|
|
|
'migrate.sync_lock': '/migrate_sync_lock'
|
2021-06-08 21:33:26 -04:00
|
|
|
},
|
|
|
|
# The schema of an individual network entry (/networks/{vni})
|
|
|
|
'network': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'vni': '', # The root key
|
2021-06-08 21:33:26 -04:00
|
|
|
'type': '/nettype',
|
2021-06-10 00:33:17 -04:00
|
|
|
'rule': '/firewall_rules',
|
|
|
|
'rule.in': '/firewall_rules/in',
|
|
|
|
'rule.out': '/firewall_rules/out',
|
2021-06-08 21:33:26 -04:00
|
|
|
'nameservers': '/name_servers',
|
|
|
|
'domain': '/domain',
|
2021-06-10 00:33:17 -04:00
|
|
|
'reservation': '/dhcp4_reservations',
|
2021-06-12 18:22:43 -04:00
|
|
|
'lease': '/dhcp4_leases',
|
2021-06-08 21:33:26 -04:00
|
|
|
'ip4.gateway': '/ip4_gateway',
|
|
|
|
'ip4.network': '/ip4_network',
|
|
|
|
'ip4.dhcp': '/dhcp4_flag',
|
|
|
|
'ip4.dhcp_start': '/dhcp4_start',
|
|
|
|
'ip4.dhcp_end': '/dhcp4_end',
|
|
|
|
'ip6.gateway': '/ip6_gateway',
|
|
|
|
'ip6.network': '/ip6_network',
|
|
|
|
'ip6.dhcp': '/dhcp6_flag'
|
|
|
|
},
|
2021-06-09 23:52:21 -04:00
|
|
|
# The schema of an individual network DHCP(v4) reservation entry (/networks/{vni}/dhcp4_reservations/{mac})
|
|
|
|
'reservation': {
|
|
|
|
'mac': '', # The root key
|
|
|
|
'ip': '/ipaddr',
|
|
|
|
'hostname': '/hostname'
|
|
|
|
},
|
2021-06-12 18:22:43 -04:00
|
|
|
# The schema of an individual network DHCP(v4) lease entry (/networks/{vni}/dhcp4_leases/{mac})
|
|
|
|
'lease': {
|
|
|
|
'mac': '', # The root key
|
|
|
|
'ip': '/ipaddr',
|
|
|
|
'hostname': '/hostname',
|
|
|
|
'expiry': '/expiry',
|
|
|
|
'client_id': '/clientid'
|
|
|
|
},
|
2021-06-10 00:33:17 -04:00
|
|
|
# The schema for an individual network ACL entry (/networks/{vni}/firewall_rules/(in|out)/{acl}
|
|
|
|
'rule': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'description': '', # The root key
|
2021-06-10 00:33:17 -04:00
|
|
|
'rule': '/rule',
|
|
|
|
'order': '/order'
|
|
|
|
},
|
2021-06-08 21:33:26 -04:00
|
|
|
# The schema of an individual OSD entry (/ceph/osds/{osd_id})
|
|
|
|
'osd': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'id': '', # The root key
|
2021-06-08 21:33:26 -04:00
|
|
|
'node': '/node',
|
|
|
|
'device': '/device',
|
|
|
|
'stats': '/stats'
|
|
|
|
},
|
|
|
|
# The schema of an individual pool entry (/ceph/pools/{pool_name})
|
|
|
|
'pool': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'name': '', # The root key
|
2021-06-08 21:33:26 -04:00
|
|
|
'pgs': '/pgs',
|
|
|
|
'stats': '/stats'
|
|
|
|
},
|
|
|
|
# The schema of an individual volume entry (/ceph/volumes/{pool_name}/{volume_name})
|
|
|
|
'volume': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'name': '', # The root key
|
2021-06-08 21:33:26 -04:00
|
|
|
'stats': '/stats'
|
|
|
|
},
|
|
|
|
# The schema of an individual snapshot entry (/ceph/volumes/{pool_name}/{volume_name}/{snapshot_name})
|
|
|
|
'snapshot': {
|
2021-06-13 14:26:17 -04:00
|
|
|
'name': '', # The root key
|
2021-06-08 21:33:26 -04:00
|
|
|
'stats': '/stats'
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
# Properties
|
|
|
|
@property
|
|
|
|
def schema_root(self):
|
|
|
|
return self._schema_root
|
|
|
|
|
|
|
|
@schema_root.setter
|
|
|
|
def schema_root(self, schema_root):
|
|
|
|
self._schema_root = schema_root
|
|
|
|
|
|
|
|
@property
|
|
|
|
def version(self):
|
|
|
|
return int(self._version)
|
|
|
|
|
|
|
|
@version.setter
|
|
|
|
def version(self, version):
|
|
|
|
self._version = int(version)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def schema(self):
|
|
|
|
return self._schema
|
|
|
|
|
|
|
|
@schema.setter
|
|
|
|
def schema(self, schema):
|
|
|
|
self._schema = schema
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return f'ZKSchema({self.version})'
|
|
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
|
if self.version < other.version:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __le__(self, other):
|
|
|
|
if self.version <= other.version:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __gt__(self, other):
|
|
|
|
if self.version > other.version:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __ge__(self, other):
|
|
|
|
if self.version >= other.version:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
if self.version == other.version:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
# Load the schema of a given version from a file
|
2021-06-09 13:23:57 -04:00
|
|
|
def load(self, version, quiet=False):
|
|
|
|
if not quiet:
|
|
|
|
print(f'Loading schema version {version}')
|
|
|
|
|
2021-06-08 21:33:26 -04:00
|
|
|
with open(f'daemon_lib/migrations/versions/{version}.json', 'r') as sfh:
|
|
|
|
self.schema = json.load(sfh)
|
|
|
|
self.version = self.schema.get('version')
|
|
|
|
|
|
|
|
# Get key paths
|
|
|
|
def path(self, ipath, item=None):
|
|
|
|
itype, *ipath = ipath.split('.')
|
|
|
|
|
|
|
|
if item is None:
|
|
|
|
return self.schema.get(itype).get('.'.join(ipath))
|
|
|
|
else:
|
2021-06-09 23:52:21 -04:00
|
|
|
base_path = self.schema.get('base').get(itype, None)
|
|
|
|
if base_path is None:
|
|
|
|
# This should only really happen for second-layer key types where the helper functions join them together
|
|
|
|
base_path = ''
|
2021-07-05 23:29:24 -04:00
|
|
|
|
|
|
|
if not ipath:
|
|
|
|
# This is a root path
|
|
|
|
return f'{base_path}/{item}'
|
|
|
|
|
2021-06-08 21:33:26 -04:00
|
|
|
sub_path = self.schema.get(itype).get('.'.join(ipath))
|
2021-06-09 01:54:29 -04:00
|
|
|
if sub_path is None:
|
2021-07-05 23:29:24 -04:00
|
|
|
# We didn't find the path we're looking for, so we don't want to do anything
|
|
|
|
return None
|
|
|
|
|
2021-06-08 21:33:26 -04:00
|
|
|
return f'{base_path}/{item}{sub_path}'
|
|
|
|
|
|
|
|
# Get keys of a schema location
|
|
|
|
def keys(self, itype=None):
|
|
|
|
if itype is None:
|
|
|
|
return list(self.schema.get('base').keys())
|
|
|
|
else:
|
|
|
|
return list(self.schema.get(itype).keys())
|
|
|
|
|
|
|
|
# Get the active version of a cluster's schema
|
|
|
|
def get_version(self, zkhandler):
|
|
|
|
try:
|
|
|
|
current_version = zkhandler.read(self.path('base.schema.version'))
|
|
|
|
except NoNodeError:
|
|
|
|
current_version = 0
|
|
|
|
return current_version
|
|
|
|
|
|
|
|
# Validate an active schema against a Zookeeper cluster
|
2021-06-08 22:19:02 -04:00
|
|
|
def validate(self, zkhandler, logger=None):
|
|
|
|
result = True
|
|
|
|
|
|
|
|
# Walk the entire tree checking our schema
|
|
|
|
for elem in ['base']:
|
|
|
|
for key in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{key}'
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath)):
|
2021-06-08 22:19:02 -04:00
|
|
|
if logger is not None:
|
|
|
|
logger.out(f'Key not found: {self.path(kpath)}', state='w')
|
|
|
|
result = False
|
|
|
|
|
|
|
|
for elem in ['node', 'domain', 'network', 'osd', 'pool']:
|
|
|
|
# First read all the subelements of the key class
|
2021-06-10 00:33:17 -04:00
|
|
|
for child in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
|
2021-06-08 22:19:02 -04:00
|
|
|
# For each key in the schema for that particular elem
|
|
|
|
for ikey in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{ikey}'
|
|
|
|
# Validate that the key exists for that child
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
|
2021-06-08 22:19:02 -04:00
|
|
|
if logger is not None:
|
|
|
|
logger.out(f'Key not found: {self.path(kpath, child)}', state='w')
|
|
|
|
result = False
|
|
|
|
|
2021-06-10 00:33:17 -04:00
|
|
|
# Continue for child keys under network (reservation, acl)
|
|
|
|
if elem in ['network'] and ikey in ['reservation', 'rule.in', 'rule.out']:
|
|
|
|
if ikey in ['rule.in', 'rule.out']:
|
|
|
|
sikey = 'rule'
|
|
|
|
else:
|
|
|
|
sikey = ikey
|
|
|
|
npath = self.path(f'{elem}.{ikey}', child)
|
|
|
|
for nchild in zkhandler.zk_conn.get_children(npath):
|
|
|
|
nkpath = f'{npath}/{nchild}'
|
|
|
|
for esikey in self.keys(sikey):
|
2021-06-10 01:00:40 -04:00
|
|
|
nkipath = f'{nkpath}/{esikey}'
|
|
|
|
if not zkhandler.zk_conn.exists(nkipath):
|
2021-06-10 00:33:17 -04:00
|
|
|
result = False
|
|
|
|
|
Implement SR-IOV PF and VF instances
Adds support for the node daemon managing SR-IOV PF and VF instances.
PFs are added to Zookeeper automatically based on the config at startup
during network configuration, and are otherwise completely static. PFs
are automatically removed from Zookeeper, along with all coresponding
VFs, should the PF phy device be removed from the configuration.
VFs are configured based on the (autocreated) VFs of each PF device,
added to Zookeeper, and then a new class instance, SRIOVVFInstance, is
used to watch them for configuration changes. This will enable the
runtime management of VF settings by the API. The set of keys ensures
that both configuration and details of the NIC can be tracked.
Most keys are self-explanatory, especially for PFs and the basic keys
for VFs. The configuration tree is also self-explanatory, being based
entirely on the options available in the `ip link set {dev} vf` command.
Two additional keys are also present: `used` and `used_by`, which will
be able to track the (boolean) state of usage, as well as the VM that
uses a given VIF. Since the VM side implementation will support both
macvtap and direct "hostdev" assignments, this will ensure that this
state can be tracked on both the VF and the VM side.
2021-06-17 01:01:23 -04:00
|
|
|
# One might expect child keys under node (specifically, sriov.pf and sriov.vf) to be
|
|
|
|
# managed here as well, but those are created automatically every time pvcnoded starts
|
|
|
|
# and thus never need to be validated or applied.
|
|
|
|
|
2021-06-08 22:19:02 -04:00
|
|
|
# These two have several children layers that must be parsed through
|
|
|
|
for elem in ['volume']:
|
|
|
|
# First read all the subelements of the key class (pool layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for pchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
|
2021-06-08 22:19:02 -04:00
|
|
|
# Finally read all the subelements of the key class (volume layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for vchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}') + f'/{pchild}'):
|
2021-06-08 22:19:02 -04:00
|
|
|
child = f'{pchild}/{vchild}'
|
|
|
|
# For each key in the schema for that particular elem
|
|
|
|
for ikey in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{ikey}'
|
|
|
|
# Validate that the key exists for that child
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
|
2021-06-08 22:19:02 -04:00
|
|
|
if logger is not None:
|
|
|
|
logger.out(f'Key not found: {self.path(kpath, child)}', state='w')
|
|
|
|
result = False
|
|
|
|
|
|
|
|
for elem in ['snapshot']:
|
|
|
|
# First read all the subelements of the key class (pool layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for pchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
|
2021-06-08 22:19:02 -04:00
|
|
|
# Next read all the subelements of the key class (volume layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for vchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}') + f'/{pchild}'):
|
2021-06-08 22:19:02 -04:00
|
|
|
# Finally read all the subelements of the key class (volume layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for schild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}') + f'/{pchild}/{vchild}'):
|
2021-06-08 22:19:02 -04:00
|
|
|
child = f'{pchild}/{vchild}/{schild}'
|
|
|
|
# For each key in the schema for that particular elem
|
|
|
|
for ikey in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{ikey}'
|
|
|
|
# Validate that the key exists for that child
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
|
2021-06-08 22:19:02 -04:00
|
|
|
if logger is not None:
|
|
|
|
logger.out(f'Key not found: {self.path(kpath, child)}', state='w')
|
|
|
|
result = False
|
|
|
|
|
|
|
|
return result
|
2021-06-08 21:33:26 -04:00
|
|
|
|
|
|
|
# Apply the current schema to the cluster
|
2021-06-08 22:19:02 -04:00
|
|
|
def apply(self, zkhandler):
|
|
|
|
# Walk the entire tree checking our schema
|
|
|
|
for elem in ['base']:
|
|
|
|
for key in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{key}'
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath)):
|
2021-06-14 21:23:16 -04:00
|
|
|
# Ensure that we create base.schema.version with the current valid version value
|
|
|
|
if kpath == 'base.schema.version':
|
|
|
|
data = str(self.version)
|
|
|
|
else:
|
|
|
|
data = ''
|
|
|
|
zkhandler.zk_conn.create(self.path(kpath), data.encode(zkhandler.encoding))
|
2021-06-08 22:19:02 -04:00
|
|
|
|
|
|
|
for elem in ['node', 'domain', 'network', 'osd', 'pool']:
|
|
|
|
# First read all the subelements of the key class
|
2021-06-10 00:33:17 -04:00
|
|
|
for child in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
|
2021-06-08 22:19:02 -04:00
|
|
|
# For each key in the schema for that particular elem
|
|
|
|
for ikey in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{ikey}'
|
|
|
|
# Validate that the key exists for that child
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
|
2021-06-10 00:55:54 -04:00
|
|
|
zkhandler.zk_conn.create(self.path(kpath, child), ''.encode(zkhandler.encoding))
|
2021-06-10 00:33:17 -04:00
|
|
|
|
|
|
|
# Continue for child keys under network (reservation, acl)
|
|
|
|
if elem in ['network'] and ikey in ['reservation', 'rule.in', 'rule.out']:
|
|
|
|
if ikey in ['rule.in', 'rule.out']:
|
|
|
|
sikey = 'rule'
|
|
|
|
else:
|
|
|
|
sikey = ikey
|
|
|
|
npath = self.path(f'{elem}.{ikey}', child)
|
|
|
|
for nchild in zkhandler.zk_conn.get_children(npath):
|
|
|
|
nkpath = f'{npath}/{nchild}'
|
|
|
|
for esikey in self.keys(sikey):
|
2021-06-10 01:00:40 -04:00
|
|
|
nkipath = f'{nkpath}/{esikey}'
|
|
|
|
if not zkhandler.zk_conn.exists(nkipath):
|
|
|
|
zkhandler.zk_conn.create(nkipath, ''.encode(zkhandler.encoding))
|
2021-06-08 22:19:02 -04:00
|
|
|
|
Implement SR-IOV PF and VF instances
Adds support for the node daemon managing SR-IOV PF and VF instances.
PFs are added to Zookeeper automatically based on the config at startup
during network configuration, and are otherwise completely static. PFs
are automatically removed from Zookeeper, along with all coresponding
VFs, should the PF phy device be removed from the configuration.
VFs are configured based on the (autocreated) VFs of each PF device,
added to Zookeeper, and then a new class instance, SRIOVVFInstance, is
used to watch them for configuration changes. This will enable the
runtime management of VF settings by the API. The set of keys ensures
that both configuration and details of the NIC can be tracked.
Most keys are self-explanatory, especially for PFs and the basic keys
for VFs. The configuration tree is also self-explanatory, being based
entirely on the options available in the `ip link set {dev} vf` command.
Two additional keys are also present: `used` and `used_by`, which will
be able to track the (boolean) state of usage, as well as the VM that
uses a given VIF. Since the VM side implementation will support both
macvtap and direct "hostdev" assignments, this will ensure that this
state can be tracked on both the VF and the VM side.
2021-06-17 01:01:23 -04:00
|
|
|
# One might expect child keys under node (specifically, sriov.pf and sriov.vf) to be
|
|
|
|
# managed here as well, but those are created automatically every time pvcnoded starts
|
|
|
|
# and thus never need to be validated or applied.
|
|
|
|
|
2021-06-08 22:19:02 -04:00
|
|
|
# These two have several children layers that must be parsed through
|
|
|
|
for elem in ['volume']:
|
|
|
|
# First read all the subelements of the key class (pool layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for pchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
|
2021-06-08 22:19:02 -04:00
|
|
|
# Finally read all the subelements of the key class (volume layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for vchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}') + f'/{pchild}'):
|
2021-06-08 22:19:02 -04:00
|
|
|
child = f'{pchild}/{vchild}'
|
|
|
|
# For each key in the schema for that particular elem
|
|
|
|
for ikey in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{ikey}'
|
|
|
|
# Validate that the key exists for that child
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
|
2021-06-10 00:55:54 -04:00
|
|
|
zkhandler.zk_conn.create(self.path(kpath, child), ''.encode(zkhandler.encoding))
|
2021-06-08 22:19:02 -04:00
|
|
|
|
|
|
|
for elem in ['snapshot']:
|
|
|
|
# First read all the subelements of the key class (pool layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for pchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}')):
|
2021-06-08 22:19:02 -04:00
|
|
|
# Next read all the subelements of the key class (volume layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for vchild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}') + f'/{pchild}'):
|
2021-06-08 22:19:02 -04:00
|
|
|
# Finally read all the subelements of the key class (volume layer)
|
2021-06-10 00:33:17 -04:00
|
|
|
for schild in zkhandler.zk_conn.get_children(self.path(f'base.{elem}') + f'/{pchild}/{vchild}'):
|
2021-06-08 22:19:02 -04:00
|
|
|
child = f'{pchild}/{vchild}/{schild}'
|
|
|
|
# For each key in the schema for that particular elem
|
|
|
|
for ikey in self.keys(elem):
|
|
|
|
kpath = f'{elem}.{ikey}'
|
|
|
|
# Validate that the key exists for that child
|
2021-06-10 00:33:17 -04:00
|
|
|
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
|
2021-06-10 00:55:54 -04:00
|
|
|
zkhandler.zk_conn.create(self.path(kpath, child), ''.encode(zkhandler.encoding))
|
2021-06-10 00:33:17 -04:00
|
|
|
|
2021-06-08 21:33:26 -04:00
|
|
|
# Migrate key diffs
|
|
|
|
def run_migrate(self, zkhandler, changes):
|
|
|
|
diff_add = changes['add']
|
|
|
|
diff_remove = changes['remove']
|
|
|
|
diff_rename = changes['rename']
|
|
|
|
add_tasks = list()
|
|
|
|
for key in diff_add.keys():
|
|
|
|
add_tasks.append((diff_add[key], ''))
|
|
|
|
remove_tasks = list()
|
|
|
|
for key in diff_remove.keys():
|
|
|
|
remove_tasks.append(diff_remove[key])
|
|
|
|
rename_tasks = list()
|
|
|
|
for key in diff_rename.keys():
|
|
|
|
rename_tasks.append((diff_rename[key]['from'], diff_rename[key]['to']))
|
|
|
|
|
|
|
|
zkhandler.write(add_tasks)
|
|
|
|
zkhandler.delete(remove_tasks)
|
|
|
|
zkhandler.rename(rename_tasks)
|
|
|
|
|
|
|
|
# Migrate from older to newer schema
|
|
|
|
def migrate(self, zkhandler, new_version):
|
|
|
|
# Determine the versions in between
|
|
|
|
versions = ZKSchema.find_all(start=self.version, end=new_version)
|
2021-06-09 00:04:16 -04:00
|
|
|
if versions is None:
|
|
|
|
return
|
2021-06-08 21:33:26 -04:00
|
|
|
|
|
|
|
for version in versions:
|
|
|
|
# Create a new schema at that version
|
|
|
|
zkschema_new = ZKSchema()
|
|
|
|
zkschema_new.load(version)
|
|
|
|
# Get a list of changes
|
|
|
|
changes = ZKSchema.key_diff(self, zkschema_new)
|
|
|
|
# Apply those changes
|
|
|
|
self.run_migrate(zkhandler, changes)
|
|
|
|
|
|
|
|
# Rollback from newer to older schema
|
|
|
|
def rollback(self, zkhandler, old_version):
|
|
|
|
# Determine the versions in between
|
|
|
|
versions = ZKSchema.find_all(start=old_version - 1, end=self.version - 1)
|
2021-06-09 00:04:16 -04:00
|
|
|
if versions is None:
|
|
|
|
return
|
|
|
|
|
2021-06-08 21:33:26 -04:00
|
|
|
versions.reverse()
|
|
|
|
|
|
|
|
for version in versions:
|
|
|
|
# Create a new schema at that version
|
|
|
|
zkschema_old = ZKSchema()
|
|
|
|
zkschema_old.load(version)
|
|
|
|
# Get a list of changes
|
|
|
|
changes = ZKSchema.key_diff(self, zkschema_old)
|
|
|
|
# Apply those changes
|
|
|
|
self.run_migrate(zkhandler, changes)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def key_diff(cls, schema_a, schema_b):
|
|
|
|
# schema_a = current
|
|
|
|
# schema_b = new
|
|
|
|
|
|
|
|
diff_add = dict()
|
|
|
|
diff_remove = dict()
|
|
|
|
diff_rename = dict()
|
|
|
|
|
|
|
|
# Parse through each core element
|
|
|
|
for elem in ['base', 'node', 'domain', 'network', 'osd', 'pool', 'volume', 'snapshot']:
|
|
|
|
set_a = set(schema_a.keys(elem))
|
|
|
|
set_b = set(schema_b.keys(elem))
|
|
|
|
diff_keys = set_a ^ set_b
|
|
|
|
|
|
|
|
for item in diff_keys:
|
2021-06-08 22:19:02 -04:00
|
|
|
elem_item = f'{elem}.{item}'
|
2021-06-08 21:33:26 -04:00
|
|
|
if item not in schema_a.keys(elem) and item in schema_b.keys(elem):
|
|
|
|
diff_add[elem_item] = schema_b.path(elem_item)
|
|
|
|
if item in schema_a.keys(elem) and item not in schema_b.keys(elem):
|
|
|
|
diff_remove[elem_item] = schema_a.path(elem_item)
|
|
|
|
|
|
|
|
for item in set_b:
|
2021-06-08 22:19:02 -04:00
|
|
|
elem_item = f'{elem}.{item}'
|
2021-06-08 21:33:26 -04:00
|
|
|
if schema_a.path(elem_item) is not None and \
|
|
|
|
schema_b.path(elem_item) is not None and \
|
|
|
|
schema_a.path(elem_item) != schema_b.path(elem_item):
|
|
|
|
diff_rename[elem_item] = {'from': schema_a.path(elem_item), 'to': schema_b.path(elem_item)}
|
|
|
|
|
|
|
|
return {'add': diff_add, 'remove': diff_remove, 'rename': diff_rename}
|
|
|
|
|
|
|
|
# Load in the schemal of the current cluster
|
|
|
|
@classmethod
|
|
|
|
def load_current(cls, zkhandler):
|
|
|
|
new_instance = cls()
|
|
|
|
version = new_instance.get_version(zkhandler)
|
|
|
|
new_instance.load(version)
|
|
|
|
return new_instance
|
|
|
|
|
|
|
|
# Write the latest schema to a file
|
|
|
|
@classmethod
|
|
|
|
def write(cls):
|
|
|
|
schema_file = 'daemon_lib/migrations/versions/{}.json'.format(cls._version)
|
|
|
|
with open(schema_file, 'w') as sfh:
|
|
|
|
json.dump(cls._schema, sfh)
|
|
|
|
|
|
|
|
# Static methods for reading information from the files
|
|
|
|
@staticmethod
|
|
|
|
def find_all(start=0, end=None):
|
|
|
|
versions = list()
|
|
|
|
for version in os.listdir('daemon_lib/migrations/versions'):
|
|
|
|
sequence_id = int(version.split('.')[0])
|
|
|
|
if end is None:
|
|
|
|
if sequence_id > start:
|
|
|
|
versions.append(sequence_id)
|
|
|
|
else:
|
|
|
|
if sequence_id > start and sequence_id <= end:
|
|
|
|
versions.append(sequence_id)
|
|
|
|
if len(versions) > 0:
|
|
|
|
return versions
|
|
|
|
else:
|
|
|
|
return None
|
2021-06-08 23:17:07 -04:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def find_latest():
|
|
|
|
latest_version = 0
|
|
|
|
for version in os.listdir('daemon_lib/migrations/versions'):
|
|
|
|
sequence_id = int(version.split('.')[0])
|
|
|
|
if sequence_id > latest_version:
|
|
|
|
latest_version = sequence_id
|
|
|
|
return latest_version
|