From bd8536d9d17d9545f182bebf1b8346bcf38e9b9f Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sat, 15 Feb 2020 02:09:28 -0500 Subject: [PATCH] Add OVA upload to API (initial) Initial, very barebones OVA parsing and image creation. References #71 --- api-daemon/pvcapid/flaskapi.py | 95 ++++++++ api-daemon/pvcapid/ova.py | 424 +++++++++++++++++++++++++++++++++ debian/control | 2 +- 3 files changed, 520 insertions(+), 1 deletion(-) create mode 100755 api-daemon/pvcapid/ova.py diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index fd8bcac8..f5acd6e0 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -39,6 +39,7 @@ from celery.task.control import inspect import pvcapid.helper as api_helper import pvcapid.provisioner as api_provisioner +import pvcapid.ova as api_ova API_VERSION = 1.0 @@ -94,6 +95,8 @@ try: api_helper.config = config # Set the config object in the api_provisioner namespace api_provisioner.config = config + # Set the config object in the api_ova namespace + api_ova.config = config except Exception as e: print('ERROR: Failed to load configuration: {}'.format(e)) exit(1) @@ -5575,6 +5578,98 @@ class API_Provisioner_Create_Root(Resource): return { "task_id": task.id }, 202, { 'Location': Api.url_for(api, API_Provisioner_Status_Element, task_id=task.id) } api.add_resource(API_Provisioner_Create_Root, '/provisioner/create') +# /provisioner/upload +class API_Provisioner_Upload(Resource): + @RequestParser([ + { 'name': 'ova_size', 'required': True, 'helpmsg': "An OVA size must be specified" }, + { 'name': 'pool', 'required': True, 'helpmsg': "A storage pool must be specified" }, + { 'name': 'name', 'required': True, 'helpmsg': "A VM name must be specified" }, + { 'name': 'define_vm' }, + { 'name': 'start_vm' } + ]) + @Authenticator + def post(self, reqargs): + """ + Upload an OVA image to a new virtual machine + + The API client is responsible for determining and setting the ova_size value, as this value cannot be determined dynamically before the upload proceeds. + + Even if define_vm is false, the name will be used to name the resulting disk volumes as it would with a normally-provisioned VM. + + The resulting VM, even if defined, will not have an attached provisioner profile. + --- + tags: + - provisioner + parameters: + - in: query + name: ova_size + type: string + required: true + description: Size of the OVA file in bytes + - in: query + name: pool + type: string + required: true + description: Storage pool name + - in: query + name: name + type: string + required: true + description: Virtual machine name + - in: query + name: define_vm + type: boolean + required: false + description: Whether to define the VM on the cluster during provisioning + - in: query + name: start_vm + type: boolean + required: false + description: Whether to start the VM after provisioning + responses: + 200: + description: OK + schema: + type: object + properties: + task_id: + type: string + description: Task ID for the provisioner Celery worker + 400: + description: Bad request + schema: + type: object + id: Message + """ + from flask_restful import reqparse + from werkzeug.datastructures import FileStorage + parser = reqparse.RequestParser() + parser.add_argument('file', type=FileStorage, location='files') + data = parser.parse_args() + ova_data = data.get('file', None) + if not ova_data: + return {'message':'An OVA file contents must be specified'}, 400 + + if bool(strtobool(reqargs.get('define_vm', 'true'))): + define_vm = True + else: + define_vm = False + + if bool(strtobool(reqargs.get('start_vm', 'true'))): + start_vm = True + else: + start_vm = False + + return api_ova.upload_ova( + ova_data, + reqargs.get('ova_size', None), + reqargs.get('pool', None), + reqargs.get('name', None), + define_vm, + start_vm + ) +api.add_resource(API_Provisioner_Upload, '/provisioner/upload') + # /provisioner/status class API_Provisioner_Status_Root(Resource): @Authenticator diff --git a/api-daemon/pvcapid/ova.py b/api-daemon/pvcapid/ova.py new file mode 100755 index 00000000..28df5a4e --- /dev/null +++ b/api-daemon/pvcapid/ova.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python3 + +# ova.py - PVC OVA parser library +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2020 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import flask +import json +import psycopg2 +import psycopg2.extras +import os +import re +import time +import math +import tarfile +import shutil +import shlex +import subprocess + +import lxml.etree + +import daemon_lib.common as pvc_common +import daemon_lib.node as pvc_node +import daemon_lib.vm as pvc_vm +import daemon_lib.network as pvc_network +import daemon_lib.ceph as pvc_ceph + +import pvcapid.libvirt_schema as libvirt_schema + +# +# OVA upload function +# +def upload_ova(ova_data, ova_size, pool, name, define_vm, start_vm): + # Upload flow is as follows: + # 1. Create temporary volume of ova_size + # 2. Map the temporary volume for reading + # 3. Write OVA upload file to temporary volume + # 4. Read tar from temporary volume, extract OVF + # 5. Parse OVF, obtain disk list and VM details + # 6. Extract and "upload" via API each disk image to Ceph + # 7. Unmap and remove the temporary volume + # 8. Define VM (if applicable) + # 9. Start VM (if applicable) + ########################################################### + + # Cleanup function + def cleanup_ova_maps_and_volumes(): + # Close the OVA archive + ova_archive.close() + zk_conn = pvc_common.startZKConnection(config['coordinators']) + # Unmap the OVA temporary blockdev + retflag, retdata = pvc_ceph.unmap_volume(zk_conn, pool, "{}_ova".format(name)) + # Remove the OVA temporary blockdev + retflag, retdata = pvc_ceph.remove_volume(zk_conn, pool, "{}_ova".format(name)) + pvc_common.stopZKConnection(zk_conn) + + # Normalize the OVA size to MB + print("Normalize the OVA size to MB") + # The function always return XXXXB, so strip off the B and convert to an integer + ova_size_bytes = int(pvc_ceph.format_bytes_fromhuman(ova_size)[:-1]) + # Put the size into KB which rbd --size can understand + ova_size_kb = math.ceil(ova_size_bytes / 1024) + ova_size = "{}K".format(ova_size_kb) + + # Create a temporary OVA blockdev + print("Create a temporary OVA blockdev") + zk_conn = pvc_common.startZKConnection(config['coordinators']) + print(ova_size) + retflag, retdata = pvc_ceph.add_volume(zk_conn, pool, "{}_ova".format(name), ova_size) + pvc_common.stopZKConnection(zk_conn) + if not retflag: + output = { + 'message': retdata.replace('\"', '\'') + } + retcode = 400 + cleanup_ova_maps_and_volumes() + return output, retcode + + # Map the temporary OVA blockdev + print("Map the temporary OVA blockdev") + zk_conn = pvc_common.startZKConnection(config['coordinators']) + retflag, retdata = pvc_ceph.map_volume(zk_conn, pool, "{}_ova".format(name)) + pvc_common.stopZKConnection(zk_conn) + if not retflag: + output = { + 'message': retdata.replace('\"', '\'') + } + retcode = 400 + cleanup_ova_maps_and_volumes() + return output, retcode + ova_blockdev = retdata + + # Save the OVA data to the temporary blockdev directly + print("Save the OVA data to the temporary blockdev directly") + try: + ova_data.save(ova_blockdev) + except: + output = { + 'message': "ERROR: Failed to write OVA file to temporary volume." + } + retcode = 400 + cleanup_ova_maps_and_volumes() + return output, retcode + + try: + # Set up the TAR reader for the OVA temporary blockdev + print("Set up the TAR reader for the OVA temporary blockdev") + ova_archive = tarfile.open(name=ova_blockdev) + # Determine the files in the OVA + print("Determine the files in the OVA") + members = ova_archive.getmembers() + except tarfile.TarError: + output = { + 'message': "ERROR: The uploaded OVA file is not readable." + } + retcode = 400 + cleanup_ova_maps_and_volumes() + return output, retcode + + # Parse through the members list and extract the OVF file + print("Parse through the members list and extract the OVF file") + for element in set(x for x in members if re.match('.*\.ovf$', x.name)): + ovf_file = ova_archive.extractfile(element) + print(ovf_file) + + # Parse the OVF file to get our VM details + print("Parse the OVF file to get our VM details") + ovf_parser = OVFParser(ovf_file) + virtual_system = ovf_parser.getVirtualSystems()[0] + virtual_hardware = ovf_parser.getVirtualHardware(virtual_system) + disk_map = ovf_parser.getDiskMap(virtual_system) + + # Close the OVF file + print("Close the OVF file") + ovf_file.close() + + print(virtual_hardware) + print(disk_map) + + # Verify that the cluster has enough space to store all OVA disk volumes + total_size_bytes = 0 + for disk in disk_map: + # Normalize the dev size to MB + print("Normalize the dev size to MB") + # The function always return XXXXB, so strip off the B and convert to an integer + dev_size_bytes = int(pvc_ceph.format_bytes_fromhuman(disk.get('capacity', 0))[:-1]) + ova_size_bytes = int(pvc_ceph.format_bytes_fromhuman(ova_size)[:-1]) + # Get the actual image size + total_size_bytes += dev_size_bytes + # Add on the OVA size to account for the VMDK + total_size_bytes += ova_size_bytes + + zk_conn = pvc_common.startZKConnection(config['coordinators']) + pool_information = pvc_ceph.getPoolInformation(zk_conn, pool) + pvc_common.stopZKConnection(zk_conn) + pool_free_space_bytes = int(pool_information['stats']['free_bytes']) + if total_size_bytes >= pool_free_space_bytes: + output = { + 'message': "ERROR: The cluster does not have enough free space ({}) to store the VM ({}).".format( + pvc_ceph.format_bytes_tohuman(pool_free_space_bytes), + pvc_ceph.format_bytes_tohuman(total_size_bytes) + ) + } + retcode = 400 + cleanup_ova_maps_and_volumes() + return output, retcode + + # Create and upload each disk volume + for idx, disk in enumerate(disk_map): + disk_identifier = "sd{}".format(chr(ord('a') + idx)) + volume = "{}_{}".format(name, disk_identifier) + dev_size = disk.get('capacity') + + # Normalize the dev size to MB + print("Normalize the dev size to MB") + # The function always return XXXXB, so strip off the B and convert to an integer + dev_size_bytes = int(pvc_ceph.format_bytes_fromhuman(dev_size)[:-1]) + dev_size_mb = math.ceil(dev_size_bytes / 1024 / 1024) + dev_size = "{}M".format(dev_size_mb) + + def cleanup_img_maps_and_volumes(): + zk_conn = pvc_common.startZKConnection(config['coordinators']) + # Unmap the target blockdev + retflag, retdata = pvc_ceph.unmap_volume(zk_conn, pool, volume) + # Unmap the temporary blockdev + retflag, retdata = pvc_ceph.unmap_volume(zk_conn, pool, "{}_tmp".format(volume)) + # Remove the temporary blockdev + retflag, retdata = pvc_ceph.remove_volume(zk_conn, pool, "{}_tmp".format(volume)) + pvc_common.stopZKConnection(zk_conn) + + # Create target blockdev + zk_conn = pvc_common.startZKConnection(config['coordinators']) + pool_information = pvc_ceph.add_volume(zk_conn, pool, volume, dev_size) + pvc_common.stopZKConnection(zk_conn) + + # Create a temporary blockdev + zk_conn = pvc_common.startZKConnection(config['coordinators']) + retflag, retdata = pvc_ceph.add_volume(zk_conn, pool, "{}_tmp".format(volume), ova_size) + pvc_common.stopZKConnection(zk_conn) + if not retflag: + output = { + 'message': retdata.replace('\"', '\'') + } + retcode = 400 + cleanup_img_maps_and_volumes() + cleanup_ova_maps_and_volumes() + return output, retcode + + # Map the temporary target blockdev + zk_conn = pvc_common.startZKConnection(config['coordinators']) + retflag, retdata = pvc_ceph.map_volume(zk_conn, pool, "{}_tmp".format(volume)) + pvc_common.stopZKConnection(zk_conn) + if not retflag: + output = { + 'message': retdata.replace('\"', '\'') + } + retcode = 400 + cleanup_img_maps_and_volumes() + cleanup_ova_maps_and_volumes() + return output, retcode + temp_blockdev = retdata + + # Map the target blockdev + zk_conn = pvc_common.startZKConnection(config['coordinators']) + retflag, retdata = pvc_ceph.map_volume(zk_conn, pool, volume) + pvc_common.stopZKConnection(zk_conn) + if not retflag: + output = { + 'message': retdata.replace('\"', '\'') + } + retcode = 400 + cleanup_img_maps_and_volumes() + cleanup_ova_maps_and_volumes() + return output, retcode + dest_blockdev = retdata + + # Save the data to the temporary blockdev directly + img_type = disk.get('src').split('.')[-1] + + try: + # Open (extract) the TAR archive file and seek to byte 0 + vmdk_file = ova_archive.extractfile(disk.get('src')) + vmdk_file.seek(0) + # Open the temporary blockdev and seek to byte 0 + blk_file = open(temp_blockdev, 'wb') + blk_file.seek(0) + # Write the contents of vmdk_file into blk_file + bytes_written = blk_file.write(vmdk_file.read()) + # Close blk_file (and flush the buffers) + blk_file.close() + # Close vmdk_file + vmdk_file.close() + # Perform an OS-level sync + pvc_common.run_os_command('sync') + # Shrink the tmp RBD image to the exact size of the written file + # This works around a bug in this method where an EOF is never written to the end of the + # target blockdev, thus causing an "Invalid footer" error. Instead, if we just shrink the + # RBD volume to the exact size, this is treated as an EOF + pvc_common.run_os_command('rbd resize {}/{}_{}_tmp --size {}B --allow-shrink'.format(pool, name, disk_identifier, bytes_written)) + except: + output = { + 'message': "ERROR: Failed to write image file '{}' to temporary volume.".format(disk.get('src')) + } + retcode = 400 + cleanup_img_maps_and_volumes() + cleanup_ova_maps_and_volumes() + return output, retcode + + # Convert from the temporary to destination format on the blockdevs + retcode, stdout, stderr = pvc_common.run_os_command( + 'qemu-img convert -C -f {} -O raw {} {}'.format(img_type, temp_blockdev, dest_blockdev) + ) + if retcode: + output = { + 'message': "ERROR: Failed to convert image '{}' format from '{}' to 'raw': {}".format(disk.get('src'), img_type, stderr) + } + retcode = 400 + cleanup_img_maps_and_volumes() + cleanup_ova_maps_and_volumes() + return output, retcode + + cleanup_img_maps_and_volumes() + + cleanup_ova_maps_and_volumes() + + # Prepare a VM configuration + + output = { + 'message': "Imported OVA file to new VM '{}'".format(name) + } + retcode = 200 + return output, retcode + +# +# OVF parser +# +OVF_SCHEMA = "http://schemas.dmtf.org/ovf/envelope/2" +RASD_SCHEMA = "http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/CIM_ResourceAllocationSettingData" +SASD_SCHEMA = "http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/CIM_StorageAllocationSettingData.xsd" +VSSD_SCHEMA = "http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/CIM_VirtualSystemSettingData" +XML_SCHEMA = "http://www.w3.org/2001/XMLSchema-instance" + +RASD_TYPE = { + "3": "vcpus", + "4": "vram", + "5": "ide-controller", + "6": "scsi-controller", + "10": "ethernet-adapter", + "15": "cdrom", + "17": "disk", + "20": "other-storage-device", + "23": "usb-controller", + "24": "graphics-controller", + "35": "sound-controller" +} +SASD_TYPE = { + "15": "cdrom", + "17": "disk" +} + +class OVFParser(object): + def _getFilelist(self): + path = "{{{schema}}}References/{{{schema}}}File".format(schema=OVF_SCHEMA) + id_attr = "{{{schema}}}id".format(schema=OVF_SCHEMA) + href_attr = "{{{schema}}}href".format(schema=OVF_SCHEMA) + current_list = self.xml.findall(path) + results = [(x.get(id_attr), x.get(href_attr)) for x in current_list] + return results + + def _getDisklist(self): + path = "{{{schema}}}DiskSection/{{{schema}}}Disk".format(schema=OVF_SCHEMA) + id_attr = "{{{schema}}}diskId".format(schema=OVF_SCHEMA) + ref_attr = "{{{schema}}}fileRef".format(schema=OVF_SCHEMA) + cap_attr = "{{{schema}}}capacity".format(schema=OVF_SCHEMA) + current_list = self.xml.findall(path) + results = [(x.get(id_attr), x.get(ref_attr), x.get(cap_attr)) for x in current_list] + return results + + def _getAttributes(self, virtual_system, path, attribute): + current_list = virtual_system.findall(path) + results = [x.get(attribute) for x in current_list] + return results + + def __init__(self, ovf_file): + self.xml = lxml.etree.parse(ovf_file) + self.filelist = self._getFilelist() + self.disklist = self._getDisklist() + + def getVirtualSystems(self): + return self.xml.findall("{{{schema}}}VirtualSystem".format(schema=OVF_SCHEMA)) + + def getVirtualHardware(self, virtual_system): + hardware_list = virtual_system.findall( + "{{{schema}}}VirtualHardwareSection/{{{schema}}}Item".format(schema=OVF_SCHEMA) + ) + virtual_hardware = {} + + for item in hardware_list: + try: + item_type = RASD_TYPE[item.find("{{{rasd}}}ResourceType".format(rasd=RASD_SCHEMA)).text] + except: + continue + quantity = item.find("{{{rasd}}}VirtualQuantity".format(rasd=RASD_SCHEMA)) + if quantity is None: + continue + print(item_type) + virtual_hardware[item_type] = quantity.text + + return virtual_hardware + + def getDiskMap(self, virtual_system): + hardware_list = virtual_system.findall( + "{{{schema}}}VirtualHardwareSection/{{{schema}}}StorageItem".format(schema=OVF_SCHEMA) + ) + disk_list = [] + + for item in hardware_list: + item_type = None + try: + item_type = SASD_TYPE[item.find("{{{sasd}}}ResourceType".format(sasd=SASD_SCHEMA)).text] + except: + item_type = RASD_TYPE[item.find("{{{rasd}}}ResourceType".format(rasd=RASD_SCHEMA)).text] + + if item_type != 'disk': + continue + + hostref = None + try: + hostref = item.find("{{{sasd}}}HostResource".format(sasd=SASD_SCHEMA)) + except: + hostref = item.find("{{{rasd}}}HostResource".format(rasd=RASD_SCHEMA)) + if hostref is None: + continue + disk_res = hostref.text + + # Determine which file this disk_res ultimately represents + (disk_id, disk_ref, disk_capacity) = [x for x in self.disklist if x[0] == disk_res.split('/')[-1]][0] + (file_id, disk_src) = [x for x in self.filelist if x[0] == disk_ref][0] + + # Append the disk with all details to the list + disk_list.append({ + "id": disk_id, + "ref": disk_ref, + "capacity": disk_capacity, + "src": disk_src + }) + + return disk_list diff --git a/debian/control b/debian/control index 4ec2be9e..db10f342 100644 --- a/debian/control +++ b/debian/control @@ -17,7 +17,7 @@ Description: Parallel Virtual Cluster node daemon (Python 3) Package: pvc-daemon-api Architecture: all -Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-gevent, python3-celery, python-celery-common, python3-distutils, redis, python3-redis +Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-gevent, python3-celery, python-celery-common, python3-distutils, redis, python3-redis, python3-lxml Description: Parallel Virtual Cluster API daemon (Python 3) A KVM/Zookeeper/Ceph-based VM and private cloud manager .