some refactor

This commit is contained in:
Mathieu Broillet 2024-08-04 12:58:59 +02:00
parent 0d7351d651
commit 9cf457511e
Signed by: mathieu
GPG Key ID: C4A6176ABC6B2DFC

View File

@ -30,9 +30,9 @@ information. The server is created using Flask and the ultimateALPR SDK is used
See the README.md file for more information on how to run this script.
"""
# Load configuration from a JSON file or environment variables
# Load configuration
CONFIG_PATH = os.path.join(bundle_dir,
'config.json') # TODO: store config file outside of bundle (to avoid compilation by users)
'config.json') # TODO: store config file outside of bundle (to remove need for compilation by users)
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, 'r') as config_file:
JSON_CONFIG = json.load(config_file)
@ -87,11 +87,11 @@ def start_backend_loop():
load_engine()
# loop for about an hour or 3000 requests then reload the engine (fix for trial license)
while counter < 3000 and time.time() - boot_time < 60 * 60:
while counter < 3000 and time.time() - boot_time < 3600:
# every 120 sec
if int(time.time()) % 120 == 0:
if not is_engine_loaded():
unload_engine() # just in case
unload_engine()
load_engine()
time.sleep(1)
@ -126,23 +126,15 @@ def process_image(image: Image) -> str:
counter += 1
width, height = image.size
if image.mode in IMAGE_TYPES_MAPPING:
image_type = IMAGE_TYPES_MAPPING[image.mode]
else:
raise ValueError("Invalid mode: %s" % image.mode)
image_type = IMAGE_TYPES_MAPPING.get(image.mode, None)
if image_type is None:
raise ValueError(f"Invalid mode: {image.mode}")
result = ultimateAlprSdk.UltAlprSdkEngine_process(
image_type,
image.tobytes(),
width,
height,
0, # stride
1 # exifOrientation
image_type, image.tobytes(), width, height, 0, 1
)
if not result.isOK():
raise RuntimeError("Process failed: %s" % result.phrase())
else:
raise RuntimeError(f"Process failed: {result.phrase()}")
return result.json()
@ -172,34 +164,30 @@ def create_rest_server_flask():
else:
wanted_cells = list(range(1, grid_size * grid_size + 1))
image = request.files['upload']
if image.filename == '':
image_file = request.files['upload']
if image_file.filename == '':
return jsonify({'error': 'No selected file'}), 400
image = Image.open(image)
image = Image.open(image_file)
result = process_image(image)
result = convert_to_cpai_compatible(result)
if not result['predictions']:
logger.debug("No plate found in the image, attempting to split the image")
logger.debug("No plate found, attempting grid split")
predictions_found = find_best_plate_with_grid_split(image, grid_size, wanted_cells)
if predictions_found:
result['predictions'].append(max(predictions_found, key=lambda x: x['confidence']))
# Add the isolated plate image to the result
if result['predictions']:
isolated_plate_image = isolate_plate_in_image(image, result['predictions'][0])
result['image'] = f"data:image/png;base64,{image_to_base64(isolated_plate_image, compress=True)}"
result['processMs'] = round((time.time() - interference) * 1000, 2)
result['inferenceMs'] = result['processMs']
process_ms = round((time.time() - interference) * 1000, 2)
result.update({'processMs': process_ms, 'inferenceMs': process_ms})
return jsonify(result)
except Exception as e:
logger.error(f"Error processing image: {e}")
logger.error(traceback.format_exc())
return jsonify({'error': 'Error processing image'}), 500
@app.route('/v1/image/alpr_grid_debug', methods=['POST'])
@ -216,35 +204,28 @@ def create_rest_server_flask():
- The image with the grid overlayed on it
"""
try:
if 'upload' not in request.files:
return jsonify({'error': 'No image found'}), 400
grid_size = int(request.form.get('grid_size', 3))
wanted_cells = request.form.get('wanted_cells')
if wanted_cells:
wanted_cells = [int(cell) for cell in wanted_cells.split(',')]
else:
wanted_cells = list(range(1, grid_size * grid_size + 1))
image = request.files['upload']
if image.filename == '':
image_file = request.files['upload']
if image_file.filename == '':
return jsonify({'error': 'No selected file'}), 400
image = Image.open(image)
image = Image.open(image_file)
image = draw_grid_and_cell_numbers_on_image(image, grid_size, wanted_cells)
image_base64 = image_to_base64(image, compress=True)
result = {
"image": f"data:image/png;base64,{image_base64}"
}
return jsonify(result)
return jsonify({"image": f"data:image/png;base64,{image_base64}"})
except Exception as e:
logger.error(f"Error processing image: {e}")
logger.error(traceback.format_exc())
return jsonify({'error': 'Error processing image'}), 500
@app.route('/')
@ -256,7 +237,6 @@ def create_rest_server_flask():
def convert_to_cpai_compatible(result):
result = json.loads(result)
response = {
'success': "true",
'processMs': result['duration'],
@ -274,27 +254,22 @@ def convert_to_cpai_compatible(result):
'timestamp': ''
}
if 'plates' in result:
plates = result['plates']
for plate in plates:
for plate in result.get('plates', []):
warpedBox = plate['warpedBox']
x_coords = warpedBox[0::2]
y_coords = warpedBox[1::2]
x_min = min(x_coords)
x_max = max(x_coords)
y_min = min(y_coords)
y_max = max(y_coords)
x_min, x_max = min(x_coords), max(x_coords)
y_min, y_max = min(y_coords), max(y_coords)
response['predictions'].append({
'confidence': plate['confidences'][0] / 100,
'label': "Plate: " + plate['text'],
'label': f"Plate: {plate['text']}",
'plate': plate['text'],
'x_min': x_min,
'x_max': x_max,
'y_min': y_min,
'y_max': y_max
})
return response
@ -337,7 +312,6 @@ def find_best_plate_with_grid_split(image: Image, grid_size: int = 3, wanted_cel
wanted_cells = list(range(1, grid_size * grid_size + 1))
predictions_found = []
width, height = image.size
cell_width = width // grid_size
cell_height = height // grid_size
@ -354,8 +328,7 @@ def find_best_plate_with_grid_split(image: Image, grid_size: int = 3, wanted_cel
cell_image = image.crop((left, upper, right, lower))
result_cell = json.loads(process_image(cell_image))
if 'plates' in result_cell:
for plate in result_cell['plates']:
for plate in result_cell.get('plates', []):
warpedBox = plate['warpedBox']
x_coords = warpedBox[0::2]
y_coords = warpedBox[1::2]
@ -366,7 +339,7 @@ def find_best_plate_with_grid_split(image: Image, grid_size: int = 3, wanted_cel
predictions_found.append({
'confidence': plate['confidences'][0] / 100,
'label': "Plate: " + plate['text'],
'label': f"Plate: {plate['text']}",
'plate': plate['text'],
'x_min': x_min,
'x_max': x_max,
@ -378,18 +351,16 @@ def find_best_plate_with_grid_split(image: Image, grid_size: int = 3, wanted_cel
def isolate_plate_in_image(image: Image, plate: dict) -> Image:
x_min = plate['x_min']
x_max = plate['x_max']
y_min = plate['y_min']
y_max = plate['y_max']
x_min, x_max = plate['x_min'], plate['x_max']
y_min, y_max = plate['y_min'], plate['y_max']
offset = 10
image = image.crop((max(0, x_min - offset), max(0, y_min - offset), min(image.size[0], x_max + offset),
cropped_image = image.crop((max(0, x_min - offset), max(0, y_min - offset), min(image.size[0], x_max + offset),
min(image.size[1], y_max + offset)))
image = image.resize((int(image.size[0] * 3), int(image.size[1] * 3)), resample=Image.Resampling.LANCZOS)
resized_image = cropped_image.resize((int(cropped_image.size[0] * 3), int(cropped_image.size[1] * 3)),
resample=Image.Resampling.LANCZOS)
return image
return resized_image
def image_to_base64(img: Image, compress=False):
@ -397,13 +368,11 @@ def image_to_base64(img: Image, compress=False):
buffered = io.BytesIO()
if compress:
img = img.resize((int(img.size[0] / 2), int(img.size[1] / 2)))
img = img.resize((img.size[0] // 2, img.size[1] // 2))
img.save(buffered, format="WEBP", quality=35, lossless=False)
else:
img.save(buffered, format="WEBP")
print(buffered.__sizeof__())
return base64.b64encode(buffered.getvalue()).decode('utf-8')