From 9cf457511e01d10d2a4d5e36248be3d495333cef Mon Sep 17 00:00:00 2001 From: Mathieu Broillet Date: Sun, 4 Aug 2024 12:58:59 +0200 Subject: [PATCH] some refactor --- alpr_api.py | 151 +++++++++++++++++++++------------------------------- 1 file changed, 60 insertions(+), 91 deletions(-) diff --git a/alpr_api.py b/alpr_api.py index 84f48fb..7eb575f 100644 --- a/alpr_api.py +++ b/alpr_api.py @@ -30,9 +30,9 @@ information. The server is created using Flask and the ultimateALPR SDK is used See the README.md file for more information on how to run this script. """ -# Load configuration from a JSON file or environment variables +# Load configuration CONFIG_PATH = os.path.join(bundle_dir, - 'config.json') # TODO: store config file outside of bundle (to avoid compilation by users) + 'config.json') # TODO: store config file outside of bundle (to remove need for compilation by users) if os.path.exists(CONFIG_PATH): with open(CONFIG_PATH, 'r') as config_file: JSON_CONFIG = json.load(config_file) @@ -87,11 +87,11 @@ def start_backend_loop(): load_engine() # loop for about an hour or 3000 requests then reload the engine (fix for trial license) - while counter < 3000 and time.time() - boot_time < 60 * 60: + while counter < 3000 and time.time() - boot_time < 3600: # every 120 sec if int(time.time()) % 120 == 0: if not is_engine_loaded(): - unload_engine() # just in case + unload_engine() load_engine() time.sleep(1) @@ -126,24 +126,16 @@ def process_image(image: Image) -> str: counter += 1 width, height = image.size - - if image.mode in IMAGE_TYPES_MAPPING: - image_type = IMAGE_TYPES_MAPPING[image.mode] - else: - raise ValueError("Invalid mode: %s" % image.mode) + image_type = IMAGE_TYPES_MAPPING.get(image.mode, None) + if image_type is None: + raise ValueError(f"Invalid mode: {image.mode}") result = ultimateAlprSdk.UltAlprSdkEngine_process( - image_type, - image.tobytes(), - width, - height, - 0, # stride - 1 # exifOrientation + image_type, image.tobytes(), width, height, 0, 1 ) if not result.isOK(): - raise RuntimeError("Process failed: %s" % result.phrase()) - else: - return result.json() + raise RuntimeError(f"Process failed: {result.phrase()}") + return result.json() def create_rest_server_flask(): @@ -172,34 +164,30 @@ def create_rest_server_flask(): else: wanted_cells = list(range(1, grid_size * grid_size + 1)) - image = request.files['upload'] - if image.filename == '': + image_file = request.files['upload'] + if image_file.filename == '': return jsonify({'error': 'No selected file'}), 400 - image = Image.open(image) + image = Image.open(image_file) result = process_image(image) result = convert_to_cpai_compatible(result) if not result['predictions']: - logger.debug("No plate found in the image, attempting to split the image") + logger.debug("No plate found, attempting grid split") predictions_found = find_best_plate_with_grid_split(image, grid_size, wanted_cells) - if predictions_found: result['predictions'].append(max(predictions_found, key=lambda x: x['confidence'])) - # Add the isolated plate image to the result if result['predictions']: isolated_plate_image = isolate_plate_in_image(image, result['predictions'][0]) result['image'] = f"data:image/png;base64,{image_to_base64(isolated_plate_image, compress=True)}" - result['processMs'] = round((time.time() - interference) * 1000, 2) - result['inferenceMs'] = result['processMs'] - + process_ms = round((time.time() - interference) * 1000, 2) + result.update({'processMs': process_ms, 'inferenceMs': process_ms}) return jsonify(result) except Exception as e: logger.error(f"Error processing image: {e}") logger.error(traceback.format_exc()) - return jsonify({'error': 'Error processing image'}), 500 @app.route('/v1/image/alpr_grid_debug', methods=['POST']) @@ -216,35 +204,28 @@ def create_rest_server_flask(): - The image with the grid overlayed on it """ try: - if 'upload' not in request.files: return jsonify({'error': 'No image found'}), 400 grid_size = int(request.form.get('grid_size', 3)) - wanted_cells = request.form.get('wanted_cells') if wanted_cells: wanted_cells = [int(cell) for cell in wanted_cells.split(',')] else: wanted_cells = list(range(1, grid_size * grid_size + 1)) - image = request.files['upload'] - if image.filename == '': + image_file = request.files['upload'] + if image_file.filename == '': return jsonify({'error': 'No selected file'}), 400 - image = Image.open(image) + image = Image.open(image_file) image = draw_grid_and_cell_numbers_on_image(image, grid_size, wanted_cells) image_base64 = image_to_base64(image, compress=True) - result = { - "image": f"data:image/png;base64,{image_base64}" - } - - return jsonify(result) + return jsonify({"image": f"data:image/png;base64,{image_base64}"}) except Exception as e: logger.error(f"Error processing image: {e}") logger.error(traceback.format_exc()) - return jsonify({'error': 'Error processing image'}), 500 @app.route('/') @@ -256,7 +237,6 @@ def create_rest_server_flask(): def convert_to_cpai_compatible(result): result = json.loads(result) - response = { 'success': "true", 'processMs': result['duration'], @@ -274,27 +254,22 @@ def convert_to_cpai_compatible(result): 'timestamp': '' } - if 'plates' in result: - plates = result['plates'] - for plate in plates: - warpedBox = plate['warpedBox'] - x_coords = warpedBox[0::2] - y_coords = warpedBox[1::2] - x_min = min(x_coords) - x_max = max(x_coords) - y_min = min(y_coords) - y_max = max(y_coords) - - response['predictions'].append({ - 'confidence': plate['confidences'][0] / 100, - 'label': "Plate: " + plate['text'], - 'plate': plate['text'], - 'x_min': x_min, - 'x_max': x_max, - 'y_min': y_min, - 'y_max': y_max - }) + for plate in result.get('plates', []): + warpedBox = plate['warpedBox'] + x_coords = warpedBox[0::2] + y_coords = warpedBox[1::2] + x_min, x_max = min(x_coords), max(x_coords) + y_min, y_max = min(y_coords), max(y_coords) + response['predictions'].append({ + 'confidence': plate['confidences'][0] / 100, + 'label': f"Plate: {plate['text']}", + 'plate': plate['text'], + 'x_min': x_min, + 'x_max': x_max, + 'y_min': y_min, + 'y_max': y_max + }) return response @@ -337,7 +312,6 @@ def find_best_plate_with_grid_split(image: Image, grid_size: int = 3, wanted_cel wanted_cells = list(range(1, grid_size * grid_size + 1)) predictions_found = [] - width, height = image.size cell_width = width // grid_size cell_height = height // grid_size @@ -354,42 +328,39 @@ def find_best_plate_with_grid_split(image: Image, grid_size: int = 3, wanted_cel cell_image = image.crop((left, upper, right, lower)) result_cell = json.loads(process_image(cell_image)) - if 'plates' in result_cell: - for plate in result_cell['plates']: - warpedBox = plate['warpedBox'] - x_coords = warpedBox[0::2] - y_coords = warpedBox[1::2] - x_min = min(x_coords) + left - x_max = max(x_coords) + left - y_min = min(y_coords) + upper - y_max = max(y_coords) + upper + for plate in result_cell.get('plates', []): + warpedBox = plate['warpedBox'] + x_coords = warpedBox[0::2] + y_coords = warpedBox[1::2] + x_min = min(x_coords) + left + x_max = max(x_coords) + left + y_min = min(y_coords) + upper + y_max = max(y_coords) + upper - predictions_found.append({ - 'confidence': plate['confidences'][0] / 100, - 'label': "Plate: " + plate['text'], - 'plate': plate['text'], - 'x_min': x_min, - 'x_max': x_max, - 'y_min': y_min, - 'y_max': y_max - }) + predictions_found.append({ + 'confidence': plate['confidences'][0] / 100, + 'label': f"Plate: {plate['text']}", + 'plate': plate['text'], + 'x_min': x_min, + 'x_max': x_max, + 'y_min': y_min, + 'y_max': y_max + }) return predictions_found def isolate_plate_in_image(image: Image, plate: dict) -> Image: - x_min = plate['x_min'] - x_max = plate['x_max'] - y_min = plate['y_min'] - y_max = plate['y_max'] - + x_min, x_max = plate['x_min'], plate['x_max'] + y_min, y_max = plate['y_min'], plate['y_max'] offset = 10 - image = image.crop((max(0, x_min - offset), max(0, y_min - offset), min(image.size[0], x_max + offset), - min(image.size[1], y_max + offset))) - image = image.resize((int(image.size[0] * 3), int(image.size[1] * 3)), resample=Image.Resampling.LANCZOS) + cropped_image = image.crop((max(0, x_min - offset), max(0, y_min - offset), min(image.size[0], x_max + offset), + min(image.size[1], y_max + offset))) + resized_image = cropped_image.resize((int(cropped_image.size[0] * 3), int(cropped_image.size[1] * 3)), + resample=Image.Resampling.LANCZOS) - return image + return resized_image def image_to_base64(img: Image, compress=False): @@ -397,13 +368,11 @@ def image_to_base64(img: Image, compress=False): buffered = io.BytesIO() if compress: - img = img.resize((int(img.size[0] / 2), int(img.size[1] / 2))) + img = img.resize((img.size[0] // 2, img.size[1] // 2)) img.save(buffered, format="WEBP", quality=35, lossless=False) else: img.save(buffered, format="WEBP") - print(buffered.__sizeof__()) - return base64.b64encode(buffered.getvalue()).decode('utf-8')