Compare commits

...

3 Commits

Author SHA1 Message Date
aa0cf11dd8 Merge pull request 'fix detached head' (#2) from detached2 into main
Reviewed-on: #2
2024-07-17 20:48:13 +00:00
42b1571ee2
improve splitting 2024-07-17 22:46:00 +02:00
b1b762ce9c
improve readability 2024-07-17 22:31:22 +02:00

View File

@ -58,25 +58,20 @@ def load_engine():
bundle_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__))) bundle_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
JSON_CONFIG["assets_folder"] = os.path.join(bundle_dir, "assets") JSON_CONFIG["assets_folder"] = os.path.join(bundle_dir, "assets")
JSON_CONFIG["charset"] = "latin" JSON_CONFIG.update({
JSON_CONFIG["car_noplate_detect_enabled"] = False # Whether to detect and return cars with no plate "charset": "latin",
JSON_CONFIG[ "car_noplate_detect_enabled": False,
"ienv_enabled"] = False # Whether to enable Image Enhancement for Night-Vision (IENV). More info about IENV at https://www.doubango.org/SDKs/anpr/docs/Features.html#image-enhancement-for-night-vision-ienv. Default: true for x86-64 and false for ARM. "ienv_enabled": False,
JSON_CONFIG[ "openvino_enabled": False,
"openvino_enabled"] = False # Whether to enable OpenVINO. Tensorflow will be used when OpenVINO is disabled "openvino_device": "GPU",
JSON_CONFIG[ "npu_enabled": False,
"openvino_device"] = "GPU" # Defines the OpenVINO device to use (CPU, GPU, FPGA...). More info at https://www.doubango.org/SDKs/anpr/docs/Configuration_options.html#openvino-device "klass_lpci_enabled": False,
JSON_CONFIG["npu_enabled"] = False # Whether to enable NPU (Neural Processing Unit) acceleration "klass_vcr_enabled": False,
JSON_CONFIG[ "klass_vmmr_enabled": False,
"klass_lpci_enabled"] = False # Whether to enable License Plate Country Identification (LPCI). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#license-plate-country-identification-lpci "klass_vbsr_enabled": False,
JSON_CONFIG[ "license_token_file": "",
"klass_vcr_enabled"] = False # Whether to enable Vehicle Color Recognition (VCR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-color-recognition-vcr "license_token_data": ""
JSON_CONFIG[ })
"klass_vmmr_enabled"] = False # Whether to enable Vehicle Make Model Recognition (VMMR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-make-model-recognition-vmmr
JSON_CONFIG[
"klass_vbsr_enabled"] = False # Whether to enable Vehicle Body Style Recognition (VBSR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-body-style-recognition-vbsr
JSON_CONFIG["license_token_file"] = "" # Path to license token file
JSON_CONFIG["license_token_data"] = "" # Base64 license token data
result = ultimateAlprSdk.UltAlprSdkEngine_init(json.dumps(JSON_CONFIG)) result = ultimateAlprSdk.UltAlprSdkEngine_init(json.dumps(JSON_CONFIG))
if not result.isOK(): if not result.isOK():
@ -108,11 +103,11 @@ def process_image(image: Image) -> str:
result = ultimateAlprSdk.UltAlprSdkEngine_process( result = ultimateAlprSdk.UltAlprSdkEngine_process(
image_type, image_type,
image.tobytes(), # type(x) == bytes image.tobytes(),
width, width,
height, height,
0, # stride 0, # stride
1 # exifOrientation (already rotated in load_image -> use default value: 1) 1 # exifOrientation
) )
if not result.isOK(): if not result.isOK():
raise RuntimeError("Process failed: %s" % result.phrase()) raise RuntimeError("Process failed: %s" % result.phrase())
@ -123,81 +118,32 @@ def process_image(image: Image) -> str:
def create_rest_server_flask(): def create_rest_server_flask():
app = Flask(__name__) app = Flask(__name__)
@app.route('/v1/<string:domain>/<string:module>', methods=['POST']) @app.route('/v1/image/alpr', methods=['POST'])
def alpr(domain, module): def alpr():
# Only care about the ALPR endpoint interference = time.time()
if domain == 'image' and module == 'alpr':
interference = time.time()
if 'upload' not in request.files:
return jsonify({'error': 'No image found'})
image = request.files['upload'] if 'upload' not in request.files:
if image.filename == '': return jsonify({'error': 'No image found'})
return jsonify({'error': 'No selected file'})
image = Image.open(image) image = request.files['upload']
result = convert_to_cpai_compatible(process_image(image)) if image.filename == '':
return jsonify({'error': 'No selected file'})
if len(result['predictions']) == 0: image = Image.open(image)
print("No plate found in the image, trying to split the image") result = process_image(image)
result = convert_to_cpai_compatible(result)
predictions_found = [] if not result['predictions']:
print("No plate found in the image, attempting to split the image")
width, height = image.size predictions_found = find_best_plate_with_split(image)
cell_width = width // 3
cell_height = height // 3
# Define which cells to process (2, 4, 5, 6, 8, 9) if predictions_found:
cells_to_process = [2, 4, 5, 6, 8, 9] result['predictions'].append(max(predictions_found, key=lambda x: x['confidence']))
# Loop through each cell result['processMs'] = round((time.time() - interference) * 1000, 2)
for cell_index in range(1, 10): result['inferenceMs'] = result['processMs']
# Calculate row and column of the cell return jsonify(result)
row = (cell_index - 1) // 3
col = (cell_index - 1) % 3
# Calculate bounding box of the cell
left = col * cell_width
upper = row * cell_height
right = left + cell_width
lower = upper + cell_height
# Check if this cell should be processed
if cell_index in cells_to_process:
# Extract the cell as a new image
cell_image = image.crop((left, upper, right, lower))
result_cell = json.loads(process_image(cell_image))
if 'plates' in result_cell:
for plate in result_cell['plates']:
warpedBox = plate['warpedBox']
x_coords = warpedBox[0::2]
y_coords = warpedBox[1::2]
x_min = min(x_coords) + left
x_max = max(x_coords) + left
y_min = min(y_coords) + upper
y_max = max(y_coords) + upper
predictions_found.append({
'confidence': plate['confidences'][0] / 100,
'label': "Plate: " + plate['text'],
'plate': plate['text'],
'x_min': x_min,
'x_max': x_max,
'y_min': y_min,
'y_max': y_max
})
if len(predictions_found) > 0:
# add the prediction with the highest confidence
result['predictions'].append(max(predictions_found, key=lambda x: x['confidence']))
result['processMs'] = round((time.time() - interference) * 1000, 2)
result['inferenceMs'] = result['processMs'] # same as processMs
return jsonify(result)
else:
return jsonify({'error': 'Endpoint not implemented'}), 404
@app.route('/') @app.route('/')
def index(): def index():
@ -228,7 +174,6 @@ def convert_to_cpai_compatible(result):
if 'plates' in result: if 'plates' in result:
plates = result['plates'] plates = result['plates']
for plate in plates: for plate in plates:
warpedBox = plate['warpedBox'] warpedBox = plate['warpedBox']
x_coords = warpedBox[0::2] x_coords = warpedBox[0::2]
@ -251,6 +196,51 @@ def convert_to_cpai_compatible(result):
return response return response
def find_best_plate_with_split(image, split_size=4, wanted_cells=None):
if wanted_cells is None:
wanted_cells = [5, 6, 7, 9, 10, 11, 14, 15] # TODO: use params not specifc to my use case
predictions_found = []
width, height = image.size
cell_width = width // split_size
cell_height = height // split_size
for cell_index in range(1, split_size * split_size + 1):
row = (cell_index - 1) // split_size
col = (cell_index - 1) % split_size
left = col * cell_width
upper = row * cell_height
right = left + cell_width
lower = upper + cell_height
if cell_index in wanted_cells:
cell_image = image.crop((left, upper, right, lower))
result_cell = json.loads(process_image(cell_image))
if 'plates' in result_cell:
for plate in result_cell['plates']:
warpedBox = plate['warpedBox']
x_coords = warpedBox[0::2]
y_coords = warpedBox[1::2]
x_min = min(x_coords) + left
x_max = max(x_coords) + left
y_min = min(y_coords) + upper
y_max = max(y_coords) + upper
predictions_found.append({
'confidence': plate['confidences'][0] / 100,
'label': "Plate: " + plate['text'],
'plate': plate['text'],
'x_min': x_min,
'x_max': x_max,
'y_min': y_min,
'y_max': y_max
})
return predictions_found
if __name__ == '__main__': if __name__ == '__main__':
engine = threading.Thread(target=load_engine, daemon=True) engine = threading.Thread(target=load_engine, daemon=True)
engine.start() engine.start()