Compare commits

..

6 Commits

Author SHA1 Message Date
35153cd6b6 Fix path handling for zkhandler
Using full paths broke the local schema generator, so convert these to
proper class instance methods and use them along with a new default +
settable override.
2024-10-11 16:03:40 -04:00
7f7047dd52 Add one more instance of mirror as purple 2024-10-11 14:44:14 -04:00
9a91767405 Add proper return codes to API handlers 2024-10-11 14:43:44 -04:00
bcfa6851e1 Use purple for mirror state colour 2024-10-11 10:44:39 -04:00
28b8b3bb44 Use proper response parsing instead of raise_for 2024-10-11 10:32:15 -04:00
02425159ef Update Grafana graphs 2024-10-11 09:47:19 -04:00
8 changed files with 6203 additions and 6204 deletions

View File

@ -21,4 +21,5 @@
from daemon_lib.zkhandler import ZKSchema from daemon_lib.zkhandler import ZKSchema
ZKSchema.write() schema = ZKSchema(root_path=".")
schema.write()

View File

@ -1432,7 +1432,7 @@ def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
output = {"message": retdata.replace('"', "'")} output = {"message": retdata.replace('"', "'")}
return output, retcode return output, retcode
return {"message": "Successfully received VM configuration data"}, 200 return {"message": "Successfully received RBD snapshot"}, 200
@ZKConnection(config) @ZKConnection(config)
@ -1448,7 +1448,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists, First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists,
this is an issue and we have to error. But this should have already happened with the RBD volumes. this is an issue and we have to error. But this should have already happened with the RBD volumes.
""" """
print(vm_config)
def parse_unified_diff(diff_text, original_text): def parse_unified_diff(diff_text, original_text):
""" """
@ -1528,6 +1527,11 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
False, False,
snapshot_vm_xml, snapshot_vm_xml,
) )
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
retcode, retmsg = pvc_vm.modify_vm_metadata( retcode, retmsg = pvc_vm.modify_vm_metadata(
zkhandler, zkhandler,
vm_config["uuid"], vm_config["uuid"],
@ -1538,6 +1542,10 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
vm_config["migration_method"], vm_config["migration_method"],
vm_config["migration_max_downtime"], vm_config["migration_max_downtime"],
) )
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"])) current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"]))
new_vm_tags = [t["name"] for t in vm_config["tags"]] new_vm_tags = [t["name"] for t in vm_config["tags"]]
@ -1576,6 +1584,10 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
vm_config["tags"], vm_config["tags"],
"mirror", "mirror",
) )
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
# Add this snapshot to the VM manually in Zookeeper # Add this snapshot to the VM manually in Zookeeper
zkhandler.write( zkhandler.write(
@ -1619,6 +1631,8 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
] ]
) )
return {"message": "Successfully received VM configuration snapshot"}, 200
# #
# Network functions # Network functions

View File

@ -223,6 +223,8 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
state_colour = ansii["green"] state_colour = ansii["green"]
elif state in ["migrate", "disable", "provision", "mirror"]: elif state in ["migrate", "disable", "provision", "mirror"]:
state_colour = ansii["blue"] state_colour = ansii["blue"]
elif state in ["mirror"]:
state_colour = ansii["purple"]
elif state in ["stop", "fail"]: elif state in ["stop", "fail"]:
state_colour = ansii["red"] state_colour = ansii["red"]
else: else:

View File

@ -1861,7 +1861,7 @@ def format_info(config, domain_information, long_output):
"provision": ansiprint.blue(), "provision": ansiprint.blue(),
"restore": ansiprint.blue(), "restore": ansiprint.blue(),
"import": ansiprint.blue(), "import": ansiprint.blue(),
"mirror": ansiprint.blue(), "mirror": ansiprint.purple(),
} }
ainformation.append( ainformation.append(
"{}State:{} {}{}{}".format( "{}State:{} {}{}{}".format(
@ -2371,16 +2371,14 @@ def format_list(config, vm_list):
# Format the string (elements) # Format the string (elements)
for domain_information in sorted(vm_list, key=lambda v: v["name"]): for domain_information in sorted(vm_list, key=lambda v: v["name"]):
if domain_information["state"] == "start": if domain_information["state"] in ["start"]:
vm_state_colour = ansiprint.green() vm_state_colour = ansiprint.green()
elif domain_information["state"] == "restart": elif domain_information["state"] in ["restart", "shutdown"]:
vm_state_colour = ansiprint.yellow() vm_state_colour = ansiprint.yellow()
elif domain_information["state"] == "shutdown": elif domain_information["state"] in ["stop", "fail"]:
vm_state_colour = ansiprint.yellow()
elif domain_information["state"] == "stop":
vm_state_colour = ansiprint.red()
elif domain_information["state"] == "fail":
vm_state_colour = ansiprint.red() vm_state_colour = ansiprint.red()
elif domain_information["state"] in ["mirror"]:
vm_state_colour = ansiprint.purple()
else: else:
vm_state_colour = ansiprint.blue() vm_state_colour = ansiprint.blue()

View File

@ -3389,18 +3389,17 @@ def vm_worker_send_snapshot(
"snapshot": snapshot_name, "snapshot": snapshot_name,
"source_snapshot": incremental_parent, "source_snapshot": incremental_parent,
} }
try:
response = session.post( response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
params=send_params, params=send_params,
json=vm_detail, json=vm_detail,
) )
response.raise_for_status() if response.status_code != 200:
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send config: {e}", f"Failed to send config: {response.json()['message']}",
) )
return False return False
@ -3548,17 +3547,16 @@ def vm_worker_send_snapshot(
buffer[i], buffer[i],
"application/octet-stream", "application/octet-stream",
) )
try:
response = session.put( response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files, files=files,
stream=True, stream=True,
) )
response.raise_for_status() if response.status_code != 200:
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send diff batch ({e}): {response.json()['message']}", f"Failed to send diff batch: {response.json()['message']}",
) )
return False return False
@ -3655,13 +3653,12 @@ def vm_worker_send_snapshot(
params=send_params, params=send_params,
data=full_chunker(), data=full_chunker(),
) )
response.raise_for_status() if response.status_code != 200:
except Exception: fail(
fail( celery,
celery, f"Failed to send snapshot: {response.json()['message']}",
f"Failed to send snapshot: {response.json()['message']}", )
) return False
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -3677,13 +3674,12 @@ def vm_worker_send_snapshot(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params, params=send_params,
) )
response.raise_for_status() if response.status_code != 200:
except Exception: fail(
fail( celery,
celery, f"Failed to send snapshot: {response.json()['message']}",
f"Failed to send snapshot: {response.json()['message']}", )
) return False
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -3983,18 +3979,17 @@ def vm_worker_create_mirror(
"snapshot": snapshot_name, "snapshot": snapshot_name,
"source_snapshot": incremental_parent, "source_snapshot": incremental_parent,
} }
try:
response = session.post( response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
params=send_params, params=send_params,
json=vm_detail, json=vm_detail,
) )
response.raise_for_status() if response.status_code != 200:
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send config: {e}", f"Failed to send config: {response.json()['message']}",
) )
return False return False
@ -4142,17 +4137,16 @@ def vm_worker_create_mirror(
buffer[i], buffer[i],
"application/octet-stream", "application/octet-stream",
) )
try:
response = session.put( response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files, files=files,
stream=True, stream=True,
) )
response.raise_for_status() if response.status_code != 200:
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send diff batch ({e}): {response.json()['message']}", f"Failed to send diff batch: {response.json()['message']}",
) )
return False return False
@ -4249,13 +4243,12 @@ def vm_worker_create_mirror(
params=send_params, params=send_params,
data=full_chunker(), data=full_chunker(),
) )
response.raise_for_status() if response.status_code != 200:
except Exception: fail(
fail( celery,
celery, f"Failed to create mirror: {response.json()['message']}",
f"Failed to create mirror: {response.json()['message']}", )
) return False
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4271,13 +4264,12 @@ def vm_worker_create_mirror(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params, params=send_params,
) )
response.raise_for_status() if response.status_code != 200:
except Exception: fail(
fail( celery,
celery, f"Failed to create mirror: {response.json()['message']}",
f"Failed to create mirror: {response.json()['message']}", )
) return False
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4610,18 +4602,17 @@ def vm_worker_promote_mirror(
"snapshot": snapshot_name, "snapshot": snapshot_name,
"source_snapshot": incremental_parent, "source_snapshot": incremental_parent,
} }
try:
response = session.post( response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
params=send_params, params=send_params,
json=vm_detail, json=vm_detail,
) )
response.raise_for_status() if response.status_code != 200:
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send config: {e}", f"Failed to send config: {response.json()['message']}",
) )
return False return False
@ -4769,17 +4760,16 @@ def vm_worker_promote_mirror(
buffer[i], buffer[i],
"application/octet-stream", "application/octet-stream",
) )
try:
response = session.put( response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files, files=files,
stream=True, stream=True,
) )
response.raise_for_status() if response.status_code != 200:
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send diff batch ({e}): {response.json()['message']}", f"Failed to send diff batch: {response.json()['message']}",
) )
return False return False
@ -4876,13 +4866,12 @@ def vm_worker_promote_mirror(
params=send_params, params=send_params,
data=full_chunker(), data=full_chunker(),
) )
response.raise_for_status() if response.status_code != 200:
except Exception: fail(
fail( celery,
celery, f"Failed to promote mirror: {response.json()['message']}",
f"Failed to promote mirror: {response.json()['message']}", )
) return False
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4898,13 +4887,12 @@ def vm_worker_promote_mirror(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params, params=send_params,
) )
response.raise_for_status() if response.status_code != 200:
except Exception: fail(
fail( celery,
celery, f"Failed to promote mirror: {response.json()['message']}",
f"Failed to promote mirror: {response.json()['message']}", )
) return False
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4929,14 +4917,12 @@ def vm_worker_promote_mirror(
total=total_stages, total=total_stages,
) )
try: response = session.post(
response = session.post( f"{destination_api_uri}/vm/{vm_name}/state",
f"{destination_api_uri}/vm/{vm_name}/state", headers={"Content-Type": "application/octet-stream"},
headers={"Content-Type": "application/octet-stream"}, params={"state": previous_vm_state, "wait": True, "force": True},
params={"state": previous_vm_state, "wait": True, "force": True}, )
) if response.status_code != 200:
response.raise_for_status()
except Exception:
fail( fail(
celery, celery,
f"Failed to promote mirror: {response.json()['message']}", f"Failed to promote mirror: {response.json()['message']}",

View File

@ -30,7 +30,8 @@ from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import NoNodeError from kazoo.exceptions import NoNodeError
SCHEMA_ROOT_PATH = "daemon_lib/migrations/versions" DEFAULT_ROOT_PATH = "/usr/share/pvc"
SCHEMA_PATH = "daemon_lib/migrations/versions"
# #
@ -832,8 +833,8 @@ class ZKSchema(object):
def schema(self, schema): def schema(self, schema):
self._schema = schema self._schema = schema
def __init__(self): def __init__(self, root_path=DEFAULT_ROOT_PATH):
pass self.schema_path = f"{root_path}/{SCHEMA_PATH}"
def __repr__(self): def __repr__(self):
return f"ZKSchema({self.version})" return f"ZKSchema({self.version})"
@ -873,7 +874,7 @@ class ZKSchema(object):
if not quiet: if not quiet:
print(f"Loading schema version {version}") print(f"Loading schema version {version}")
with open(f"{SCHEMA_ROOT_PATH}/{version}.json", "r") as sfh: with open(f"{self.schema_path}/{version}.json", "r") as sfh:
self.schema = json.load(sfh) self.schema = json.load(sfh)
self.version = self.schema.get("version") self.version = self.schema.get("version")
@ -1135,7 +1136,7 @@ class ZKSchema(object):
# Migrate from older to newer schema # Migrate from older to newer schema
def migrate(self, zkhandler, new_version): def migrate(self, zkhandler, new_version):
# Determine the versions in between # Determine the versions in between
versions = ZKSchema.find_all(start=self.version, end=new_version) versions = self.find_all(start=self.version, end=new_version)
if versions is None: if versions is None:
return return
@ -1151,7 +1152,7 @@ class ZKSchema(object):
# Rollback from newer to older schema # Rollback from newer to older schema
def rollback(self, zkhandler, old_version): def rollback(self, zkhandler, old_version):
# Determine the versions in between # Determine the versions in between
versions = ZKSchema.find_all(start=old_version - 1, end=self.version - 1) versions = self.find_all(start=old_version - 1, end=self.version - 1)
if versions is None: if versions is None:
return return
@ -1166,6 +1167,12 @@ class ZKSchema(object):
# Apply those changes # Apply those changes
self.run_migrate(zkhandler, changes) self.run_migrate(zkhandler, changes)
# Write the latest schema to a file
def write(self):
schema_file = f"{self.schema_path}/{self._version}.json"
with open(schema_file, "w") as sfh:
json.dump(self._schema, sfh)
@classmethod @classmethod
def key_diff(cls, schema_a, schema_b): def key_diff(cls, schema_a, schema_b):
# schema_a = current # schema_a = current
@ -1211,26 +1218,10 @@ class ZKSchema(object):
return {"add": diff_add, "remove": diff_remove, "rename": diff_rename} return {"add": diff_add, "remove": diff_remove, "rename": diff_rename}
# Load in the schemal of the current cluster
@classmethod
def load_current(cls, zkhandler):
new_instance = cls()
version = new_instance.get_version(zkhandler)
new_instance.load(version)
return new_instance
# Write the latest schema to a file
@classmethod
def write(cls):
schema_file = f"{SCHEMA_ROOT_PATH}/{cls._version}.json"
with open(schema_file, "w") as sfh:
json.dump(cls._schema, sfh)
# Static methods for reading information from the files # Static methods for reading information from the files
@staticmethod def find_all(self, start=0, end=None):
def find_all(start=0, end=None):
versions = list() versions = list()
for version in os.listdir(SCHEMA_ROOT_PATH): for version in os.listdir(self.schema_path):
sequence_id = int(version.split(".")[0]) sequence_id = int(version.split(".")[0])
if end is None: if end is None:
if sequence_id > start: if sequence_id > start:
@ -1243,11 +1234,18 @@ class ZKSchema(object):
else: else:
return None return None
@staticmethod def find_latest(self):
def find_latest():
latest_version = 0 latest_version = 0
for version in os.listdir(SCHEMA_ROOT_PATH): for version in os.listdir(self.schema_path):
sequence_id = int(version.split(".")[0]) sequence_id = int(version.split(".")[0])
if sequence_id > latest_version: if sequence_id > latest_version:
latest_version = sequence_id latest_version = sequence_id
return latest_version return latest_version
# Load in the schema of the current cluster
@classmethod
def load_current(cls, zkhandler):
new_instance = cls()
version = new_instance.get_version(zkhandler)
new_instance.load(version)
return new_instance

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff