Improve loading efficiency of common.py

This commit is contained in:
Joshua Boniface 2025-03-12 22:54:38 -04:00
parent 60967b5606
commit ee055bdb81

View File

@ -19,12 +19,13 @@
# #
############################################################################### ###############################################################################
import os
import math
import time
import requests
import click
from ast import literal_eval from ast import literal_eval
from click import echo, progressbar
from math import ceil
from os.path import getsize
from requests import get, post, put, patch, delete, Response
from requests.exceptions import ConnectionError
from time import time
from urllib3 import disable_warnings from urllib3 import disable_warnings
@ -39,7 +40,7 @@ def format_bytes(size_bytes):
} }
human_bytes = "0B" human_bytes = "0B"
for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get): for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get):
formatted_bytes = int(math.ceil(size_bytes / byte_unit_matrix[unit])) formatted_bytes = int(ceil(size_bytes / byte_unit_matrix[unit]))
if formatted_bytes < 10000: if formatted_bytes < 10000:
human_bytes = "{}{}".format(formatted_bytes, unit) human_bytes = "{}{}".format(formatted_bytes, unit)
break break
@ -57,7 +58,7 @@ def format_metric(integer):
} }
human_integer = "0" human_integer = "0"
for unit in sorted(integer_unit_matrix, key=integer_unit_matrix.get): for unit in sorted(integer_unit_matrix, key=integer_unit_matrix.get):
formatted_integer = int(math.ceil(integer / integer_unit_matrix[unit])) formatted_integer = int(ceil(integer / integer_unit_matrix[unit]))
if formatted_integer < 10000: if formatted_integer < 10000:
human_integer = "{}{}".format(formatted_integer, unit) human_integer = "{}{}".format(formatted_integer, unit)
break break
@ -97,12 +98,12 @@ def format_age(age_secs):
class UploadProgressBar(object): class UploadProgressBar(object):
def __init__(self, filename, end_message="", end_nl=True): def __init__(self, filename, end_message="", end_nl=True):
file_size = os.path.getsize(filename) file_size = getsize(filename)
file_size_human = format_bytes(file_size) file_size_human = format_bytes(file_size)
click.echo("Uploading file (total size {})...".format(file_size_human)) echo("Uploading file (total size {})...".format(file_size_human))
self.length = file_size self.length = file_size
self.time_last = int(round(time.time() * 1000)) - 1000 self.time_last = int(round(time() * 1000)) - 1000
self.bytes_last = 0 self.bytes_last = 0
self.bytes_diff = 0 self.bytes_diff = 0
self.is_end = False self.is_end = False
@ -114,7 +115,7 @@ class UploadProgressBar(object):
else: else:
self.end_suffix = "" self.end_suffix = ""
self.bar = click.progressbar(length=self.length, width=20, show_eta=True) self.bar = progressbar(length=self.length, width=20, show_eta=True)
def update(self, monitor): def update(self, monitor):
bytes_cur = monitor.bytes_read bytes_cur = monitor.bytes_read
@ -123,7 +124,7 @@ class UploadProgressBar(object):
self.is_end = True self.is_end = True
self.bytes_last = bytes_cur self.bytes_last = bytes_cur
time_cur = int(round(time.time() * 1000)) time_cur = int(round(time() * 1000))
if (time_cur - 1000) > self.time_last: if (time_cur - 1000) > self.time_last:
self.time_last = time_cur self.time_last = time_cur
self.bar.update(self.bytes_diff) self.bar.update(self.bytes_diff)
@ -132,13 +133,13 @@ class UploadProgressBar(object):
if self.is_end: if self.is_end:
self.bar.update(self.bytes_diff) self.bar.update(self.bytes_diff)
self.bytes_diff = 0 self.bytes_diff = 0
click.echo() echo()
click.echo() echo()
if self.end_message: if self.end_message:
click.echo(self.end_message + self.end_suffix, nl=self.end_nl) echo(self.end_message + self.end_suffix, nl=self.end_nl)
class ErrorResponse(requests.Response): class ErrorResponse(Response):
def __init__(self, json_data, status_code, headers): def __init__(self, json_data, status_code, headers):
self.json_data = json_data self.json_data = json_data
self.status_code = status_code self.status_code = status_code
@ -178,7 +179,7 @@ def call_api(
for i in range(3): for i in range(3):
failed = False failed = False
try: try:
response = requests.get( response = get(
uri, uri,
timeout=timeout, timeout=timeout,
headers=headers, headers=headers,
@ -190,16 +191,14 @@ def call_api(
failed = True failed = True
continue continue
break break
except requests.exceptions.ConnectionError: except ConnectionError:
failed = True failed = True
continue continue
if failed: if failed:
error = f"Code {response.status_code}" if response else "Timeout" error = f"Code {response.status_code}" if response else "Timeout"
raise requests.exceptions.ConnectionError( raise ConnectionError(f"Failed to connect after 3 tries ({error})")
f"Failed to connect after 3 tries ({error})"
)
if operation == "post": if operation == "post":
response = requests.post( response = post(
uri, uri,
timeout=timeout, timeout=timeout,
headers=headers, headers=headers,
@ -209,7 +208,7 @@ def call_api(
verify=config["verify_ssl"], verify=config["verify_ssl"],
) )
if operation == "put": if operation == "put":
response = requests.put( response = put(
uri, uri,
timeout=timeout, timeout=timeout,
headers=headers, headers=headers,
@ -219,7 +218,7 @@ def call_api(
verify=config["verify_ssl"], verify=config["verify_ssl"],
) )
if operation == "patch": if operation == "patch":
response = requests.patch( response = patch(
uri, uri,
timeout=timeout, timeout=timeout,
headers=headers, headers=headers,
@ -228,7 +227,7 @@ def call_api(
verify=config["verify_ssl"], verify=config["verify_ssl"],
) )
if operation == "delete": if operation == "delete":
response = requests.delete( response = patch, delete(
uri, uri,
timeout=timeout, timeout=timeout,
headers=headers, headers=headers,
@ -243,10 +242,10 @@ def call_api(
# Display debug output # Display debug output
if config["debug"]: if config["debug"]:
click.echo("API endpoint: {}".format(uri), err=True) echo("API endpoint: {}".format(uri), err=True)
click.echo("Response code: {}".format(response.status_code), err=True) echo("Response code: {}".format(response.status_code), err=True)
click.echo("Response headers: {}".format(response.headers), err=True) echo("Response headers: {}".format(response.headers), err=True)
click.echo(err=True) echo(err=True)
# Return the response object # Return the response object
return response return response