Move to lazy imports and custom API client

This commit is contained in:
Joshua Boniface 2025-03-13 00:26:47 -04:00
parent bb77c5f1fc
commit 52aa351c60
3 changed files with 487 additions and 282 deletions

File diff suppressed because it is too large Load Diff

View File

@ -23,10 +23,11 @@ from ast import literal_eval
from click import echo, progressbar from click import echo, progressbar
from math import ceil from math import ceil
from os.path import getsize from os.path import getsize
from requests import get, post, put, patch, delete, Response
from requests.exceptions import ConnectionError
from time import time from time import time
from urllib3 import disable_warnings import socket
import json
import ssl
import base64
def format_bytes(size_bytes): def format_bytes(size_bytes):
@ -139,14 +140,186 @@ class UploadProgressBar(object):
echo(self.end_message + self.end_suffix, nl=self.end_nl) echo(self.end_message + self.end_suffix, nl=self.end_nl)
class ErrorResponse(Response): class Response:
def __init__(self, json_data, status_code, headers): """Minimal Response class to replace requests.Response"""
self.json_data = json_data def __init__(self, status_code, headers, content):
self.status_code = status_code self.status_code = status_code
self.headers = headers self.headers = headers
self.content = content
self._json = None
def json(self): def json(self):
return self.json_data if self._json is None:
try:
self._json = json.loads(self.content.decode('utf-8'))
except json.JSONDecodeError:
self._json = {}
return self._json
class ConnectionError(Exception):
"""Simple ConnectionError class to replace requests.exceptions.ConnectionError"""
pass
class ErrorResponse(Response):
def __init__(self, json_data, status_code, headers):
self.status_code = status_code
self.headers = headers
self._json = json_data
self.content = json.dumps(json_data).encode('utf-8') if json_data else b''
def json(self):
return self._json
def _parse_url(url):
"""Simple URL parser without using urllib"""
if '://' in url:
scheme, rest = url.split('://', 1)
else:
scheme = 'http'
rest = url
if '/' in rest:
host_port, path = rest.split('/', 1)
path = '/' + path
else:
host_port = rest
path = '/'
if ':' in host_port:
host, port_str = host_port.split(':', 1)
port = int(port_str)
else:
host = host_port
port = 443 if scheme == 'https' else 80
return scheme, host, port, path
def _encode_params(params):
"""Simple URL parameter encoder"""
if not params:
return ''
parts = []
for key, value in params.items():
if isinstance(value, bool):
value = str(value).lower()
elif value is None:
value = ''
else:
value = str(value)
parts.append(f"{key}={value}")
return '?' + '&'.join(parts)
def _make_request(method, url, headers=None, params=None, data=None, files=None, timeout=5, verify=True):
"""Simple HTTP client using sockets"""
headers = headers or {}
scheme, host, port, path = _parse_url(url)
# Add query parameters
path += _encode_params(params)
# Prepare body
body = None
if data is not None and files is None:
if isinstance(data, dict):
body = json.dumps(data).encode('utf-8')
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
else:
body = data.encode('utf-8') if isinstance(data, str) else data
# Handle file uploads
if files:
boundary = f'----WebKitFormBoundary{int(time())}'
headers['Content-Type'] = f'multipart/form-data; boundary={boundary}'
body = b''
# Add form fields
if data:
for key, value in data.items():
body += f'--{boundary}\r\n'.encode()
body += f'Content-Disposition: form-data; name="{key}"\r\n\r\n'.encode()
body += f'{value}\r\n'.encode()
# Add files
for key, file_tuple in files.items():
filename, fileobj, content_type = file_tuple
body += f'--{boundary}\r\n'.encode()
body += f'Content-Disposition: form-data; name="{key}"; filename="{filename}"\r\n'.encode()
body += f'Content-Type: {content_type}\r\n\r\n'.encode()
body += fileobj.read()
body += b'\r\n'
body += f'--{boundary}--\r\n'.encode()
# Create socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
try:
# Handle SSL for HTTPS
if scheme == 'https':
context = ssl.create_default_context()
if not verify:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
# Connect
s.connect((host, port))
# Build request
request = f"{method} {path} HTTP/1.1\r\n"
request += f"Host: {host}\r\n"
for key, value in headers.items():
request += f"{key}: {value}\r\n"
if body:
request += f"Content-Length: {len(body)}\r\n"
request += "Connection: close\r\n\r\n"
# Send request
s.sendall(request.encode('utf-8'))
if body:
s.sendall(body)
# Read response
response_data = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
response_data += chunk
# Parse response
header_end = response_data.find(b'\r\n\r\n')
headers_raw = response_data[:header_end].decode('utf-8')
body_raw = response_data[header_end + 4:]
# Parse status code
status_line = headers_raw.split('\r\n')[0]
status_code = int(status_line.split(' ')[1])
# Parse headers
headers_dict = {}
for line in headers_raw.split('\r\n')[1:]:
if not line:
continue
key, value = line.split(':', 1)
headers_dict[key.strip()] = value.strip()
return Response(status_code, headers_dict, body_raw)
finally:
s.close()
def call_api( def call_api(
@ -159,7 +332,7 @@ def call_api(
files=None, files=None,
): ):
# Set the connect timeout to 2 seconds but extremely long (48 hour) data timeout # Set the connect timeout to 2 seconds but extremely long (48 hour) data timeout
timeout = (2.05, 172800) timeout = 2.05
# Craft the URI # Craft the URI
uri = "{}://{}{}{}".format( uri = "{}://{}{}{}".format(
@ -171,7 +344,6 @@ def call_api(
headers["X-Api-Key"] = config["api_key"] headers["X-Api-Key"] = config["api_key"]
# Determine the request type and hit the API # Determine the request type and hit the API
disable_warnings()
try: try:
response = None response = None
if operation == "get": if operation == "get":
@ -179,61 +351,66 @@ def call_api(
for i in range(3): for i in range(3):
failed = False failed = False
try: try:
response = get( response = _make_request(
"GET",
uri, uri,
timeout=timeout,
headers=headers, headers=headers,
params=params, params=params,
data=data, data=data,
verify=config["verify_ssl"], timeout=timeout,
verify=config["verify_ssl"]
) )
if response.status_code in retry_on_code: if response.status_code in retry_on_code:
failed = True failed = True
continue continue
break break
except ConnectionError: except Exception:
failed = True failed = True
continue continue
if failed: if failed:
error = f"Code {response.status_code}" if response else "Timeout" error = f"Code {response.status_code}" if response else "Timeout"
raise ConnectionError(f"Failed to connect after 3 tries ({error})") raise ConnectionError(f"Failed to connect after 3 tries ({error})")
if operation == "post": elif operation == "post":
response = post( response = _make_request(
"POST",
uri, uri,
timeout=timeout,
headers=headers, headers=headers,
params=params, params=params,
data=data, data=data,
files=files, files=files,
verify=config["verify_ssl"],
)
if operation == "put":
response = put(
uri,
timeout=timeout, timeout=timeout,
verify=config["verify_ssl"]
)
elif operation == "put":
response = _make_request(
"PUT",
uri,
headers=headers, headers=headers,
params=params, params=params,
data=data, data=data,
files=files, files=files,
verify=config["verify_ssl"],
)
if operation == "patch":
response = patch(
uri,
timeout=timeout, timeout=timeout,
verify=config["verify_ssl"]
)
elif operation == "patch":
response = _make_request(
"PATCH",
uri,
headers=headers, headers=headers,
params=params, params=params,
data=data, data=data,
verify=config["verify_ssl"],
)
if operation == "delete":
response = patch, delete(
uri,
timeout=timeout, timeout=timeout,
verify=config["verify_ssl"]
)
elif operation == "delete":
response = _make_request(
"DELETE",
uri,
headers=headers, headers=headers,
params=params, params=params,
data=data, data=data,
verify=config["verify_ssl"], timeout=timeout,
verify=config["verify_ssl"]
) )
except Exception as e: except Exception as e:
message = "Failed to connect to the API: {}".format(e) message = "Failed to connect to the API: {}".format(e)

View File

@ -0,0 +1,17 @@
"""
Lazy import mechanism for PVC CLI to reduce startup time
"""
class LazyModule:
"""
A proxy for a module that is loaded only when actually used
"""
def __init__(self, name):
self.name = name
self._module = None
def __getattr__(self, attr):
if self._module is None:
import importlib
self._module = importlib.import_module(self.name)
return getattr(self._module, attr)