Compare commits

..

No commits in common. "35153cd6b64b9740a0f9032ee9ced56775216c24" and "a6f8500309c5547ed98d57181ab138af5800133e" have entirely different histories.

8 changed files with 6273 additions and 6272 deletions

View File

@ -21,5 +21,4 @@
from daemon_lib.zkhandler import ZKSchema from daemon_lib.zkhandler import ZKSchema
schema = ZKSchema(root_path=".") ZKSchema.write()
schema.write()

View File

@ -1432,7 +1432,7 @@ def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
output = {"message": retdata.replace('"', "'")} output = {"message": retdata.replace('"', "'")}
return output, retcode return output, retcode
return {"message": "Successfully received RBD snapshot"}, 200 return {"message": "Successfully received VM configuration data"}, 200
@ZKConnection(config) @ZKConnection(config)
@ -1448,6 +1448,7 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists, First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists,
this is an issue and we have to error. But this should have already happened with the RBD volumes. this is an issue and we have to error. But this should have already happened with the RBD volumes.
""" """
print(vm_config)
def parse_unified_diff(diff_text, original_text): def parse_unified_diff(diff_text, original_text):
""" """
@ -1527,11 +1528,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
False, False,
snapshot_vm_xml, snapshot_vm_xml,
) )
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
retcode, retmsg = pvc_vm.modify_vm_metadata( retcode, retmsg = pvc_vm.modify_vm_metadata(
zkhandler, zkhandler,
vm_config["uuid"], vm_config["uuid"],
@ -1542,10 +1538,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
vm_config["migration_method"], vm_config["migration_method"],
vm_config["migration_max_downtime"], vm_config["migration_max_downtime"],
) )
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"])) current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"]))
new_vm_tags = [t["name"] for t in vm_config["tags"]] new_vm_tags = [t["name"] for t in vm_config["tags"]]
@ -1584,10 +1576,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
vm_config["tags"], vm_config["tags"],
"mirror", "mirror",
) )
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
# Add this snapshot to the VM manually in Zookeeper # Add this snapshot to the VM manually in Zookeeper
zkhandler.write( zkhandler.write(
@ -1631,8 +1619,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
] ]
) )
return {"message": "Successfully received VM configuration snapshot"}, 200
# #
# Network functions # Network functions

View File

@ -223,8 +223,6 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
state_colour = ansii["green"] state_colour = ansii["green"]
elif state in ["migrate", "disable", "provision", "mirror"]: elif state in ["migrate", "disable", "provision", "mirror"]:
state_colour = ansii["blue"] state_colour = ansii["blue"]
elif state in ["mirror"]:
state_colour = ansii["purple"]
elif state in ["stop", "fail"]: elif state in ["stop", "fail"]:
state_colour = ansii["red"] state_colour = ansii["red"]
else: else:

View File

@ -1861,7 +1861,7 @@ def format_info(config, domain_information, long_output):
"provision": ansiprint.blue(), "provision": ansiprint.blue(),
"restore": ansiprint.blue(), "restore": ansiprint.blue(),
"import": ansiprint.blue(), "import": ansiprint.blue(),
"mirror": ansiprint.purple(), "mirror": ansiprint.blue(),
} }
ainformation.append( ainformation.append(
"{}State:{} {}{}{}".format( "{}State:{} {}{}{}".format(
@ -2371,14 +2371,16 @@ def format_list(config, vm_list):
# Format the string (elements) # Format the string (elements)
for domain_information in sorted(vm_list, key=lambda v: v["name"]): for domain_information in sorted(vm_list, key=lambda v: v["name"]):
if domain_information["state"] in ["start"]: if domain_information["state"] == "start":
vm_state_colour = ansiprint.green() vm_state_colour = ansiprint.green()
elif domain_information["state"] in ["restart", "shutdown"]: elif domain_information["state"] == "restart":
vm_state_colour = ansiprint.yellow() vm_state_colour = ansiprint.yellow()
elif domain_information["state"] in ["stop", "fail"]: elif domain_information["state"] == "shutdown":
vm_state_colour = ansiprint.yellow()
elif domain_information["state"] == "stop":
vm_state_colour = ansiprint.red()
elif domain_information["state"] == "fail":
vm_state_colour = ansiprint.red() vm_state_colour = ansiprint.red()
elif domain_information["state"] in ["mirror"]:
vm_state_colour = ansiprint.purple()
else: else:
vm_state_colour = ansiprint.blue() vm_state_colour = ansiprint.blue()

View File

@ -3389,17 +3389,18 @@ def vm_worker_send_snapshot(
"snapshot": snapshot_name, "snapshot": snapshot_name,
"source_snapshot": incremental_parent, "source_snapshot": incremental_parent,
} }
try:
response = session.post( response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
params=send_params, params=send_params,
json=vm_detail, json=vm_detail,
) )
if response.status_code != 200: response.raise_for_status()
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send config: {response.json()['message']}", f"Failed to send config: {e}",
) )
return False return False
@ -3547,16 +3548,17 @@ def vm_worker_send_snapshot(
buffer[i], buffer[i],
"application/octet-stream", "application/octet-stream",
) )
try:
response = session.put( response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files, files=files,
stream=True, stream=True,
) )
if response.status_code != 200: response.raise_for_status()
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send diff batch: {response.json()['message']}", f"Failed to send diff batch ({e}): {response.json()['message']}",
) )
return False return False
@ -3653,12 +3655,13 @@ def vm_worker_send_snapshot(
params=send_params, params=send_params,
data=full_chunker(), data=full_chunker(),
) )
if response.status_code != 200: response.raise_for_status()
fail( except Exception:
celery, fail(
f"Failed to send snapshot: {response.json()['message']}", celery,
) f"Failed to send snapshot: {response.json()['message']}",
return False )
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -3674,12 +3677,13 @@ def vm_worker_send_snapshot(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params, params=send_params,
) )
if response.status_code != 200: response.raise_for_status()
fail( except Exception:
celery, fail(
f"Failed to send snapshot: {response.json()['message']}", celery,
) f"Failed to send snapshot: {response.json()['message']}",
return False )
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -3979,17 +3983,18 @@ def vm_worker_create_mirror(
"snapshot": snapshot_name, "snapshot": snapshot_name,
"source_snapshot": incremental_parent, "source_snapshot": incremental_parent,
} }
try:
response = session.post( response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
params=send_params, params=send_params,
json=vm_detail, json=vm_detail,
) )
if response.status_code != 200: response.raise_for_status()
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send config: {response.json()['message']}", f"Failed to send config: {e}",
) )
return False return False
@ -4137,16 +4142,17 @@ def vm_worker_create_mirror(
buffer[i], buffer[i],
"application/octet-stream", "application/octet-stream",
) )
try:
response = session.put( response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files, files=files,
stream=True, stream=True,
) )
if response.status_code != 200: response.raise_for_status()
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send diff batch: {response.json()['message']}", f"Failed to send diff batch ({e}): {response.json()['message']}",
) )
return False return False
@ -4243,12 +4249,13 @@ def vm_worker_create_mirror(
params=send_params, params=send_params,
data=full_chunker(), data=full_chunker(),
) )
if response.status_code != 200: response.raise_for_status()
fail( except Exception:
celery, fail(
f"Failed to create mirror: {response.json()['message']}", celery,
) f"Failed to create mirror: {response.json()['message']}",
return False )
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4264,12 +4271,13 @@ def vm_worker_create_mirror(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params, params=send_params,
) )
if response.status_code != 200: response.raise_for_status()
fail( except Exception:
celery, fail(
f"Failed to create mirror: {response.json()['message']}", celery,
) f"Failed to create mirror: {response.json()['message']}",
return False )
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4602,17 +4610,18 @@ def vm_worker_promote_mirror(
"snapshot": snapshot_name, "snapshot": snapshot_name,
"source_snapshot": incremental_parent, "source_snapshot": incremental_parent,
} }
try:
response = session.post( response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
params=send_params, params=send_params,
json=vm_detail, json=vm_detail,
) )
if response.status_code != 200: response.raise_for_status()
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send config: {response.json()['message']}", f"Failed to send config: {e}",
) )
return False return False
@ -4760,16 +4769,17 @@ def vm_worker_promote_mirror(
buffer[i], buffer[i],
"application/octet-stream", "application/octet-stream",
) )
try:
response = session.put( response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files, files=files,
stream=True, stream=True,
) )
if response.status_code != 200: response.raise_for_status()
except Exception as e:
fail( fail(
celery, celery,
f"Failed to send diff batch: {response.json()['message']}", f"Failed to send diff batch ({e}): {response.json()['message']}",
) )
return False return False
@ -4866,12 +4876,13 @@ def vm_worker_promote_mirror(
params=send_params, params=send_params,
data=full_chunker(), data=full_chunker(),
) )
if response.status_code != 200: response.raise_for_status()
fail( except Exception:
celery, fail(
f"Failed to promote mirror: {response.json()['message']}", celery,
) f"Failed to promote mirror: {response.json()['message']}",
return False )
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4887,12 +4898,13 @@ def vm_worker_promote_mirror(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block", f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params, params=send_params,
) )
if response.status_code != 200: response.raise_for_status()
fail( except Exception:
celery, fail(
f"Failed to promote mirror: {response.json()['message']}", celery,
) f"Failed to promote mirror: {response.json()['message']}",
return False )
return False
finally: finally:
image.close() image.close()
ioctx.close() ioctx.close()
@ -4917,12 +4929,14 @@ def vm_worker_promote_mirror(
total=total_stages, total=total_stages,
) )
response = session.post( try:
f"{destination_api_uri}/vm/{vm_name}/state", response = session.post(
headers={"Content-Type": "application/octet-stream"}, f"{destination_api_uri}/vm/{vm_name}/state",
params={"state": previous_vm_state, "wait": True, "force": True}, headers={"Content-Type": "application/octet-stream"},
) params={"state": previous_vm_state, "wait": True, "force": True},
if response.status_code != 200: )
response.raise_for_status()
except Exception:
fail( fail(
celery, celery,
f"Failed to promote mirror: {response.json()['message']}", f"Failed to promote mirror: {response.json()['message']}",

View File

@ -30,8 +30,7 @@ from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import NoNodeError from kazoo.exceptions import NoNodeError
DEFAULT_ROOT_PATH = "/usr/share/pvc" SCHEMA_ROOT_PATH = "daemon_lib/migrations/versions"
SCHEMA_PATH = "daemon_lib/migrations/versions"
# #
@ -833,8 +832,8 @@ class ZKSchema(object):
def schema(self, schema): def schema(self, schema):
self._schema = schema self._schema = schema
def __init__(self, root_path=DEFAULT_ROOT_PATH): def __init__(self):
self.schema_path = f"{root_path}/{SCHEMA_PATH}" pass
def __repr__(self): def __repr__(self):
return f"ZKSchema({self.version})" return f"ZKSchema({self.version})"
@ -874,7 +873,7 @@ class ZKSchema(object):
if not quiet: if not quiet:
print(f"Loading schema version {version}") print(f"Loading schema version {version}")
with open(f"{self.schema_path}/{version}.json", "r") as sfh: with open(f"{SCHEMA_ROOT_PATH}/{version}.json", "r") as sfh:
self.schema = json.load(sfh) self.schema = json.load(sfh)
self.version = self.schema.get("version") self.version = self.schema.get("version")
@ -1136,7 +1135,7 @@ class ZKSchema(object):
# Migrate from older to newer schema # Migrate from older to newer schema
def migrate(self, zkhandler, new_version): def migrate(self, zkhandler, new_version):
# Determine the versions in between # Determine the versions in between
versions = self.find_all(start=self.version, end=new_version) versions = ZKSchema.find_all(start=self.version, end=new_version)
if versions is None: if versions is None:
return return
@ -1152,7 +1151,7 @@ class ZKSchema(object):
# Rollback from newer to older schema # Rollback from newer to older schema
def rollback(self, zkhandler, old_version): def rollback(self, zkhandler, old_version):
# Determine the versions in between # Determine the versions in between
versions = self.find_all(start=old_version - 1, end=self.version - 1) versions = ZKSchema.find_all(start=old_version - 1, end=self.version - 1)
if versions is None: if versions is None:
return return
@ -1167,12 +1166,6 @@ class ZKSchema(object):
# Apply those changes # Apply those changes
self.run_migrate(zkhandler, changes) self.run_migrate(zkhandler, changes)
# Write the latest schema to a file
def write(self):
schema_file = f"{self.schema_path}/{self._version}.json"
with open(schema_file, "w") as sfh:
json.dump(self._schema, sfh)
@classmethod @classmethod
def key_diff(cls, schema_a, schema_b): def key_diff(cls, schema_a, schema_b):
# schema_a = current # schema_a = current
@ -1218,10 +1211,26 @@ class ZKSchema(object):
return {"add": diff_add, "remove": diff_remove, "rename": diff_rename} return {"add": diff_add, "remove": diff_remove, "rename": diff_rename}
# Load in the schemal of the current cluster
@classmethod
def load_current(cls, zkhandler):
new_instance = cls()
version = new_instance.get_version(zkhandler)
new_instance.load(version)
return new_instance
# Write the latest schema to a file
@classmethod
def write(cls):
schema_file = f"{SCHEMA_ROOT_PATH}/{cls._version}.json"
with open(schema_file, "w") as sfh:
json.dump(cls._schema, sfh)
# Static methods for reading information from the files # Static methods for reading information from the files
def find_all(self, start=0, end=None): @staticmethod
def find_all(start=0, end=None):
versions = list() versions = list()
for version in os.listdir(self.schema_path): for version in os.listdir(SCHEMA_ROOT_PATH):
sequence_id = int(version.split(".")[0]) sequence_id = int(version.split(".")[0])
if end is None: if end is None:
if sequence_id > start: if sequence_id > start:
@ -1234,18 +1243,11 @@ class ZKSchema(object):
else: else:
return None return None
def find_latest(self): @staticmethod
def find_latest():
latest_version = 0 latest_version = 0
for version in os.listdir(self.schema_path): for version in os.listdir(SCHEMA_ROOT_PATH):
sequence_id = int(version.split(".")[0]) sequence_id = int(version.split(".")[0])
if sequence_id > latest_version: if sequence_id > latest_version:
latest_version = sequence_id latest_version = sequence_id
return latest_version return latest_version
# Load in the schema of the current cluster
@classmethod
def load_current(cls, zkhandler):
new_instance = cls()
version = new_instance.get_version(zkhandler)
new_instance.load(version)
return new_instance

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff