Compare commits

..

No commits in common. "35153cd6b64b9740a0f9032ee9ced56775216c24" and "a6f8500309c5547ed98d57181ab138af5800133e" have entirely different histories.

8 changed files with 6273 additions and 6272 deletions

View File

@ -21,5 +21,4 @@
from daemon_lib.zkhandler import ZKSchema
schema = ZKSchema(root_path=".")
schema.write()
ZKSchema.write()

View File

@ -1432,7 +1432,7 @@ def vm_snapshot_receive_block_createsnap(zkhandler, pool, volume, snapshot):
output = {"message": retdata.replace('"', "'")}
return output, retcode
return {"message": "Successfully received RBD snapshot"}, 200
return {"message": "Successfully received VM configuration data"}, 200
@ZKConnection(config)
@ -1448,6 +1448,7 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
First, we need to determine if this is an incremental or full send. If it's full, and the VM already exists,
this is an issue and we have to error. But this should have already happened with the RBD volumes.
"""
print(vm_config)
def parse_unified_diff(diff_text, original_text):
"""
@ -1527,11 +1528,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
False,
snapshot_vm_xml,
)
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
retcode, retmsg = pvc_vm.modify_vm_metadata(
zkhandler,
vm_config["uuid"],
@ -1542,10 +1538,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
vm_config["migration_method"],
vm_config["migration_max_downtime"],
)
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
current_vm_tags = zkhandler.children(("domain.meta.tags", vm_config["uuid"]))
new_vm_tags = [t["name"] for t in vm_config["tags"]]
@ -1584,10 +1576,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
vm_config["tags"],
"mirror",
)
if not retcode:
retcode = 400
retdata = {"message": retmsg}
return retdata, retcode
# Add this snapshot to the VM manually in Zookeeper
zkhandler.write(
@ -1631,8 +1619,6 @@ def vm_snapshot_receive_config(zkhandler, snapshot, vm_config, source_snapshot=N
]
)
return {"message": "Successfully received VM configuration snapshot"}, 200
#
# Network functions

View File

@ -223,8 +223,6 @@ def cli_cluster_status_format_pretty(CLI_CONFIG, data):
state_colour = ansii["green"]
elif state in ["migrate", "disable", "provision", "mirror"]:
state_colour = ansii["blue"]
elif state in ["mirror"]:
state_colour = ansii["purple"]
elif state in ["stop", "fail"]:
state_colour = ansii["red"]
else:

View File

@ -1861,7 +1861,7 @@ def format_info(config, domain_information, long_output):
"provision": ansiprint.blue(),
"restore": ansiprint.blue(),
"import": ansiprint.blue(),
"mirror": ansiprint.purple(),
"mirror": ansiprint.blue(),
}
ainformation.append(
"{}State:{} {}{}{}".format(
@ -2371,14 +2371,16 @@ def format_list(config, vm_list):
# Format the string (elements)
for domain_information in sorted(vm_list, key=lambda v: v["name"]):
if domain_information["state"] in ["start"]:
if domain_information["state"] == "start":
vm_state_colour = ansiprint.green()
elif domain_information["state"] in ["restart", "shutdown"]:
elif domain_information["state"] == "restart":
vm_state_colour = ansiprint.yellow()
elif domain_information["state"] in ["stop", "fail"]:
elif domain_information["state"] == "shutdown":
vm_state_colour = ansiprint.yellow()
elif domain_information["state"] == "stop":
vm_state_colour = ansiprint.red()
elif domain_information["state"] == "fail":
vm_state_colour = ansiprint.red()
elif domain_information["state"] in ["mirror"]:
vm_state_colour = ansiprint.purple()
else:
vm_state_colour = ansiprint.blue()

View File

@ -3389,17 +3389,18 @@ def vm_worker_send_snapshot(
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
if response.status_code != 200:
try:
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {response.json()['message']}",
f"Failed to send config: {e}",
)
return False
@ -3547,16 +3548,17 @@ def vm_worker_send_snapshot(
buffer[i],
"application/octet-stream",
)
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
if response.status_code != 200:
try:
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send diff batch: {response.json()['message']}",
f"Failed to send diff batch ({e}): {response.json()['message']}",
)
return False
@ -3653,12 +3655,13 @@ def vm_worker_send_snapshot(
params=send_params,
data=full_chunker(),
)
if response.status_code != 200:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
@ -3674,12 +3677,13 @@ def vm_worker_send_snapshot(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params,
)
if response.status_code != 200:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to send snapshot: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
@ -3979,17 +3983,18 @@ def vm_worker_create_mirror(
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
if response.status_code != 200:
try:
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {response.json()['message']}",
f"Failed to send config: {e}",
)
return False
@ -4137,16 +4142,17 @@ def vm_worker_create_mirror(
buffer[i],
"application/octet-stream",
)
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
if response.status_code != 200:
try:
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send diff batch: {response.json()['message']}",
f"Failed to send diff batch ({e}): {response.json()['message']}",
)
return False
@ -4243,12 +4249,13 @@ def vm_worker_create_mirror(
params=send_params,
data=full_chunker(),
)
if response.status_code != 200:
fail(
celery,
f"Failed to create mirror: {response.json()['message']}",
)
return False
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to create mirror: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
@ -4264,12 +4271,13 @@ def vm_worker_create_mirror(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params,
)
if response.status_code != 200:
fail(
celery,
f"Failed to create mirror: {response.json()['message']}",
)
return False
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to create mirror: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
@ -4602,17 +4610,18 @@ def vm_worker_promote_mirror(
"snapshot": snapshot_name,
"source_snapshot": incremental_parent,
}
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
if response.status_code != 200:
try:
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/config",
headers={"Content-Type": "application/json"},
params=send_params,
json=vm_detail,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send config: {response.json()['message']}",
f"Failed to send config: {e}",
)
return False
@ -4760,16 +4769,17 @@ def vm_worker_promote_mirror(
buffer[i],
"application/octet-stream",
)
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
if response.status_code != 200:
try:
response = session.put(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
files=files,
stream=True,
)
response.raise_for_status()
except Exception as e:
fail(
celery,
f"Failed to send diff batch: {response.json()['message']}",
f"Failed to send diff batch ({e}): {response.json()['message']}",
)
return False
@ -4866,12 +4876,13 @@ def vm_worker_promote_mirror(
params=send_params,
data=full_chunker(),
)
if response.status_code != 200:
fail(
celery,
f"Failed to promote mirror: {response.json()['message']}",
)
return False
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to promote mirror: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
@ -4887,12 +4898,13 @@ def vm_worker_promote_mirror(
f"{destination_api_uri}/vm/{vm_name}/snapshot/receive/block",
params=send_params,
)
if response.status_code != 200:
fail(
celery,
f"Failed to promote mirror: {response.json()['message']}",
)
return False
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to promote mirror: {response.json()['message']}",
)
return False
finally:
image.close()
ioctx.close()
@ -4917,12 +4929,14 @@ def vm_worker_promote_mirror(
total=total_stages,
)
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/state",
headers={"Content-Type": "application/octet-stream"},
params={"state": previous_vm_state, "wait": True, "force": True},
)
if response.status_code != 200:
try:
response = session.post(
f"{destination_api_uri}/vm/{vm_name}/state",
headers={"Content-Type": "application/octet-stream"},
params={"state": previous_vm_state, "wait": True, "force": True},
)
response.raise_for_status()
except Exception:
fail(
celery,
f"Failed to promote mirror: {response.json()['message']}",

View File

@ -30,8 +30,7 @@ from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import NoNodeError
DEFAULT_ROOT_PATH = "/usr/share/pvc"
SCHEMA_PATH = "daemon_lib/migrations/versions"
SCHEMA_ROOT_PATH = "daemon_lib/migrations/versions"
#
@ -833,8 +832,8 @@ class ZKSchema(object):
def schema(self, schema):
self._schema = schema
def __init__(self, root_path=DEFAULT_ROOT_PATH):
self.schema_path = f"{root_path}/{SCHEMA_PATH}"
def __init__(self):
pass
def __repr__(self):
return f"ZKSchema({self.version})"
@ -874,7 +873,7 @@ class ZKSchema(object):
if not quiet:
print(f"Loading schema version {version}")
with open(f"{self.schema_path}/{version}.json", "r") as sfh:
with open(f"{SCHEMA_ROOT_PATH}/{version}.json", "r") as sfh:
self.schema = json.load(sfh)
self.version = self.schema.get("version")
@ -1136,7 +1135,7 @@ class ZKSchema(object):
# Migrate from older to newer schema
def migrate(self, zkhandler, new_version):
# Determine the versions in between
versions = self.find_all(start=self.version, end=new_version)
versions = ZKSchema.find_all(start=self.version, end=new_version)
if versions is None:
return
@ -1152,7 +1151,7 @@ class ZKSchema(object):
# Rollback from newer to older schema
def rollback(self, zkhandler, old_version):
# Determine the versions in between
versions = self.find_all(start=old_version - 1, end=self.version - 1)
versions = ZKSchema.find_all(start=old_version - 1, end=self.version - 1)
if versions is None:
return
@ -1167,12 +1166,6 @@ class ZKSchema(object):
# Apply those changes
self.run_migrate(zkhandler, changes)
# Write the latest schema to a file
def write(self):
schema_file = f"{self.schema_path}/{self._version}.json"
with open(schema_file, "w") as sfh:
json.dump(self._schema, sfh)
@classmethod
def key_diff(cls, schema_a, schema_b):
# schema_a = current
@ -1218,10 +1211,26 @@ class ZKSchema(object):
return {"add": diff_add, "remove": diff_remove, "rename": diff_rename}
# Load in the schemal of the current cluster
@classmethod
def load_current(cls, zkhandler):
new_instance = cls()
version = new_instance.get_version(zkhandler)
new_instance.load(version)
return new_instance
# Write the latest schema to a file
@classmethod
def write(cls):
schema_file = f"{SCHEMA_ROOT_PATH}/{cls._version}.json"
with open(schema_file, "w") as sfh:
json.dump(cls._schema, sfh)
# Static methods for reading information from the files
def find_all(self, start=0, end=None):
@staticmethod
def find_all(start=0, end=None):
versions = list()
for version in os.listdir(self.schema_path):
for version in os.listdir(SCHEMA_ROOT_PATH):
sequence_id = int(version.split(".")[0])
if end is None:
if sequence_id > start:
@ -1234,18 +1243,11 @@ class ZKSchema(object):
else:
return None
def find_latest(self):
@staticmethod
def find_latest():
latest_version = 0
for version in os.listdir(self.schema_path):
for version in os.listdir(SCHEMA_ROOT_PATH):
sequence_id = int(version.split(".")[0])
if sequence_id > latest_version:
latest_version = sequence_id
return latest_version
# Load in the schema of the current cluster
@classmethod
def load_current(cls, zkhandler):
new_instance = cls()
version = new_instance.get_version(zkhandler)
new_instance.load(version)
return new_instance

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff