diff --git a/alpr_api.py b/alpr_api.py index 086ce01..86a8b3c 100644 --- a/alpr_api.py +++ b/alpr_api.py @@ -58,25 +58,20 @@ def load_engine(): bundle_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__))) JSON_CONFIG["assets_folder"] = os.path.join(bundle_dir, "assets") - JSON_CONFIG["charset"] = "latin" - JSON_CONFIG["car_noplate_detect_enabled"] = False # Whether to detect and return cars with no plate - JSON_CONFIG[ - "ienv_enabled"] = False # Whether to enable Image Enhancement for Night-Vision (IENV). More info about IENV at https://www.doubango.org/SDKs/anpr/docs/Features.html#image-enhancement-for-night-vision-ienv. Default: true for x86-64 and false for ARM. - JSON_CONFIG[ - "openvino_enabled"] = False # Whether to enable OpenVINO. Tensorflow will be used when OpenVINO is disabled - JSON_CONFIG[ - "openvino_device"] = "GPU" # Defines the OpenVINO device to use (CPU, GPU, FPGA...). More info at https://www.doubango.org/SDKs/anpr/docs/Configuration_options.html#openvino-device - JSON_CONFIG["npu_enabled"] = False # Whether to enable NPU (Neural Processing Unit) acceleration - JSON_CONFIG[ - "klass_lpci_enabled"] = False # Whether to enable License Plate Country Identification (LPCI). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#license-plate-country-identification-lpci - JSON_CONFIG[ - "klass_vcr_enabled"] = False # Whether to enable Vehicle Color Recognition (VCR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-color-recognition-vcr - JSON_CONFIG[ - "klass_vmmr_enabled"] = False # Whether to enable Vehicle Make Model Recognition (VMMR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-make-model-recognition-vmmr - JSON_CONFIG[ - "klass_vbsr_enabled"] = False # Whether to enable Vehicle Body Style Recognition (VBSR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-body-style-recognition-vbsr - JSON_CONFIG["license_token_file"] = "" # Path to license token file - JSON_CONFIG["license_token_data"] = "" # Base64 license token data + JSON_CONFIG.update({ + "charset": "latin", + "car_noplate_detect_enabled": False, + "ienv_enabled": False, + "openvino_enabled": False, + "openvino_device": "GPU", + "npu_enabled": False, + "klass_lpci_enabled": False, + "klass_vcr_enabled": False, + "klass_vmmr_enabled": False, + "klass_vbsr_enabled": False, + "license_token_file": "", + "license_token_data": "" + }) result = ultimateAlprSdk.UltAlprSdkEngine_init(json.dumps(JSON_CONFIG)) if not result.isOK(): @@ -108,11 +103,11 @@ def process_image(image: Image) -> str: result = ultimateAlprSdk.UltAlprSdkEngine_process( image_type, - image.tobytes(), # type(x) == bytes + image.tobytes(), width, height, 0, # stride - 1 # exifOrientation (already rotated in load_image -> use default value: 1) + 1 # exifOrientation ) if not result.isOK(): raise RuntimeError("Process failed: %s" % result.phrase()) @@ -123,81 +118,32 @@ def process_image(image: Image) -> str: def create_rest_server_flask(): app = Flask(__name__) - @app.route('/v1//', methods=['POST']) - def alpr(domain, module): - # Only care about the ALPR endpoint - if domain == 'image' and module == 'alpr': - interference = time.time() - if 'upload' not in request.files: - return jsonify({'error': 'No image found'}) + @app.route('/v1/image/alpr', methods=['POST']) + def alpr(): + interference = time.time() - image = request.files['upload'] - if image.filename == '': - return jsonify({'error': 'No selected file'}) + if 'upload' not in request.files: + return jsonify({'error': 'No image found'}) - image = Image.open(image) - result = convert_to_cpai_compatible(process_image(image)) + image = request.files['upload'] + if image.filename == '': + return jsonify({'error': 'No selected file'}) - if len(result['predictions']) == 0: - print("No plate found in the image, trying to split the image") + image = Image.open(image) + result = process_image(image) + result = convert_to_cpai_compatible(result) - predictions_found = [] + if not result['predictions']: + print("No plate found in the image, attempting to split the image") - width, height = image.size - cell_width = width // 3 - cell_height = height // 3 + predictions_found = find_best_plate_with_split(image) - # Define which cells to process (2, 4, 5, 6, 8, 9) - cells_to_process = [2, 4, 5, 6, 8, 9] + if predictions_found: + result['predictions'].append(max(predictions_found, key=lambda x: x['confidence'])) - # Loop through each cell - for cell_index in range(1, 10): - # Calculate row and column of the cell - row = (cell_index - 1) // 3 - col = (cell_index - 1) % 3 - - # Calculate bounding box of the cell - left = col * cell_width - upper = row * cell_height - right = left + cell_width - lower = upper + cell_height - - # Check if this cell should be processed - if cell_index in cells_to_process: - # Extract the cell as a new image - cell_image = image.crop((left, upper, right, lower)) - - result_cell = json.loads(process_image(cell_image)) - - if 'plates' in result_cell: - for plate in result_cell['plates']: - warpedBox = plate['warpedBox'] - x_coords = warpedBox[0::2] - y_coords = warpedBox[1::2] - x_min = min(x_coords) + left - x_max = max(x_coords) + left - y_min = min(y_coords) + upper - y_max = max(y_coords) + upper - - predictions_found.append({ - 'confidence': plate['confidences'][0] / 100, - 'label': "Plate: " + plate['text'], - 'plate': plate['text'], - 'x_min': x_min, - 'x_max': x_max, - 'y_min': y_min, - 'y_max': y_max - }) - - if len(predictions_found) > 0: - # add the prediction with the highest confidence - result['predictions'].append(max(predictions_found, key=lambda x: x['confidence'])) - - result['processMs'] = round((time.time() - interference) * 1000, 2) - result['inferenceMs'] = result['processMs'] # same as processMs - return jsonify(result) - else: - return jsonify({'error': 'Endpoint not implemented'}), 404 + result['processMs'] = round((time.time() - interference) * 1000, 2) + result['inferenceMs'] = result['processMs'] + return jsonify(result) @app.route('/') def index(): @@ -228,7 +174,6 @@ def convert_to_cpai_compatible(result): if 'plates' in result: plates = result['plates'] - for plate in plates: warpedBox = plate['warpedBox'] x_coords = warpedBox[0::2] @@ -251,6 +196,51 @@ def convert_to_cpai_compatible(result): return response +def find_best_plate_with_split(image, split_size=4, wanted_cells=None): + if wanted_cells is None: + wanted_cells = [5, 6, 7, 9, 10, 11, 14, 15] # TODO: use params not specifc to my use case + + predictions_found = [] + + width, height = image.size + cell_width = width // split_size + cell_height = height // split_size + + for cell_index in range(1, split_size * split_size + 1): + row = (cell_index - 1) // split_size + col = (cell_index - 1) % split_size + left = col * cell_width + upper = row * cell_height + right = left + cell_width + lower = upper + cell_height + + if cell_index in wanted_cells: + cell_image = image.crop((left, upper, right, lower)) + result_cell = json.loads(process_image(cell_image)) + + if 'plates' in result_cell: + for plate in result_cell['plates']: + warpedBox = plate['warpedBox'] + x_coords = warpedBox[0::2] + y_coords = warpedBox[1::2] + x_min = min(x_coords) + left + x_max = max(x_coords) + left + y_min = min(y_coords) + upper + y_max = max(y_coords) + upper + + predictions_found.append({ + 'confidence': plate['confidences'][0] / 100, + 'label': "Plate: " + plate['text'], + 'plate': plate['text'], + 'x_min': x_min, + 'x_max': x_max, + 'y_min': y_min, + 'y_max': y_max + }) + + return predictions_found + + if __name__ == '__main__': engine = threading.Thread(target=load_engine, daemon=True) engine.start()