Compare commits
3 Commits
0479a48ae2
...
aa0cf11dd8
Author | SHA1 | Date | |
---|---|---|---|
aa0cf11dd8 | |||
42b1571ee2 | |||
b1b762ce9c |
170
alpr_api.py
170
alpr_api.py
@ -58,25 +58,20 @@ def load_engine():
|
|||||||
bundle_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
|
bundle_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
|
||||||
|
|
||||||
JSON_CONFIG["assets_folder"] = os.path.join(bundle_dir, "assets")
|
JSON_CONFIG["assets_folder"] = os.path.join(bundle_dir, "assets")
|
||||||
JSON_CONFIG["charset"] = "latin"
|
JSON_CONFIG.update({
|
||||||
JSON_CONFIG["car_noplate_detect_enabled"] = False # Whether to detect and return cars with no plate
|
"charset": "latin",
|
||||||
JSON_CONFIG[
|
"car_noplate_detect_enabled": False,
|
||||||
"ienv_enabled"] = False # Whether to enable Image Enhancement for Night-Vision (IENV). More info about IENV at https://www.doubango.org/SDKs/anpr/docs/Features.html#image-enhancement-for-night-vision-ienv. Default: true for x86-64 and false for ARM.
|
"ienv_enabled": False,
|
||||||
JSON_CONFIG[
|
"openvino_enabled": False,
|
||||||
"openvino_enabled"] = False # Whether to enable OpenVINO. Tensorflow will be used when OpenVINO is disabled
|
"openvino_device": "GPU",
|
||||||
JSON_CONFIG[
|
"npu_enabled": False,
|
||||||
"openvino_device"] = "GPU" # Defines the OpenVINO device to use (CPU, GPU, FPGA...). More info at https://www.doubango.org/SDKs/anpr/docs/Configuration_options.html#openvino-device
|
"klass_lpci_enabled": False,
|
||||||
JSON_CONFIG["npu_enabled"] = False # Whether to enable NPU (Neural Processing Unit) acceleration
|
"klass_vcr_enabled": False,
|
||||||
JSON_CONFIG[
|
"klass_vmmr_enabled": False,
|
||||||
"klass_lpci_enabled"] = False # Whether to enable License Plate Country Identification (LPCI). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#license-plate-country-identification-lpci
|
"klass_vbsr_enabled": False,
|
||||||
JSON_CONFIG[
|
"license_token_file": "",
|
||||||
"klass_vcr_enabled"] = False # Whether to enable Vehicle Color Recognition (VCR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-color-recognition-vcr
|
"license_token_data": ""
|
||||||
JSON_CONFIG[
|
})
|
||||||
"klass_vmmr_enabled"] = False # Whether to enable Vehicle Make Model Recognition (VMMR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-make-model-recognition-vmmr
|
|
||||||
JSON_CONFIG[
|
|
||||||
"klass_vbsr_enabled"] = False # Whether to enable Vehicle Body Style Recognition (VBSR). More info at https://www.doubango.org/SDKs/anpr/docs/Features.html#vehicle-body-style-recognition-vbsr
|
|
||||||
JSON_CONFIG["license_token_file"] = "" # Path to license token file
|
|
||||||
JSON_CONFIG["license_token_data"] = "" # Base64 license token data
|
|
||||||
|
|
||||||
result = ultimateAlprSdk.UltAlprSdkEngine_init(json.dumps(JSON_CONFIG))
|
result = ultimateAlprSdk.UltAlprSdkEngine_init(json.dumps(JSON_CONFIG))
|
||||||
if not result.isOK():
|
if not result.isOK():
|
||||||
@ -108,11 +103,11 @@ def process_image(image: Image) -> str:
|
|||||||
|
|
||||||
result = ultimateAlprSdk.UltAlprSdkEngine_process(
|
result = ultimateAlprSdk.UltAlprSdkEngine_process(
|
||||||
image_type,
|
image_type,
|
||||||
image.tobytes(), # type(x) == bytes
|
image.tobytes(),
|
||||||
width,
|
width,
|
||||||
height,
|
height,
|
||||||
0, # stride
|
0, # stride
|
||||||
1 # exifOrientation (already rotated in load_image -> use default value: 1)
|
1 # exifOrientation
|
||||||
)
|
)
|
||||||
if not result.isOK():
|
if not result.isOK():
|
||||||
raise RuntimeError("Process failed: %s" % result.phrase())
|
raise RuntimeError("Process failed: %s" % result.phrase())
|
||||||
@ -123,81 +118,32 @@ def process_image(image: Image) -> str:
|
|||||||
def create_rest_server_flask():
|
def create_rest_server_flask():
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
@app.route('/v1/<string:domain>/<string:module>', methods=['POST'])
|
@app.route('/v1/image/alpr', methods=['POST'])
|
||||||
def alpr(domain, module):
|
def alpr():
|
||||||
# Only care about the ALPR endpoint
|
interference = time.time()
|
||||||
if domain == 'image' and module == 'alpr':
|
|
||||||
interference = time.time()
|
|
||||||
if 'upload' not in request.files:
|
|
||||||
return jsonify({'error': 'No image found'})
|
|
||||||
|
|
||||||
image = request.files['upload']
|
if 'upload' not in request.files:
|
||||||
if image.filename == '':
|
return jsonify({'error': 'No image found'})
|
||||||
return jsonify({'error': 'No selected file'})
|
|
||||||
|
|
||||||
image = Image.open(image)
|
image = request.files['upload']
|
||||||
result = convert_to_cpai_compatible(process_image(image))
|
if image.filename == '':
|
||||||
|
return jsonify({'error': 'No selected file'})
|
||||||
|
|
||||||
if len(result['predictions']) == 0:
|
image = Image.open(image)
|
||||||
print("No plate found in the image, trying to split the image")
|
result = process_image(image)
|
||||||
|
result = convert_to_cpai_compatible(result)
|
||||||
|
|
||||||
predictions_found = []
|
if not result['predictions']:
|
||||||
|
print("No plate found in the image, attempting to split the image")
|
||||||
|
|
||||||
width, height = image.size
|
predictions_found = find_best_plate_with_split(image)
|
||||||
cell_width = width // 3
|
|
||||||
cell_height = height // 3
|
|
||||||
|
|
||||||
# Define which cells to process (2, 4, 5, 6, 8, 9)
|
if predictions_found:
|
||||||
cells_to_process = [2, 4, 5, 6, 8, 9]
|
result['predictions'].append(max(predictions_found, key=lambda x: x['confidence']))
|
||||||
|
|
||||||
# Loop through each cell
|
result['processMs'] = round((time.time() - interference) * 1000, 2)
|
||||||
for cell_index in range(1, 10):
|
result['inferenceMs'] = result['processMs']
|
||||||
# Calculate row and column of the cell
|
return jsonify(result)
|
||||||
row = (cell_index - 1) // 3
|
|
||||||
col = (cell_index - 1) % 3
|
|
||||||
|
|
||||||
# Calculate bounding box of the cell
|
|
||||||
left = col * cell_width
|
|
||||||
upper = row * cell_height
|
|
||||||
right = left + cell_width
|
|
||||||
lower = upper + cell_height
|
|
||||||
|
|
||||||
# Check if this cell should be processed
|
|
||||||
if cell_index in cells_to_process:
|
|
||||||
# Extract the cell as a new image
|
|
||||||
cell_image = image.crop((left, upper, right, lower))
|
|
||||||
|
|
||||||
result_cell = json.loads(process_image(cell_image))
|
|
||||||
|
|
||||||
if 'plates' in result_cell:
|
|
||||||
for plate in result_cell['plates']:
|
|
||||||
warpedBox = plate['warpedBox']
|
|
||||||
x_coords = warpedBox[0::2]
|
|
||||||
y_coords = warpedBox[1::2]
|
|
||||||
x_min = min(x_coords) + left
|
|
||||||
x_max = max(x_coords) + left
|
|
||||||
y_min = min(y_coords) + upper
|
|
||||||
y_max = max(y_coords) + upper
|
|
||||||
|
|
||||||
predictions_found.append({
|
|
||||||
'confidence': plate['confidences'][0] / 100,
|
|
||||||
'label': "Plate: " + plate['text'],
|
|
||||||
'plate': plate['text'],
|
|
||||||
'x_min': x_min,
|
|
||||||
'x_max': x_max,
|
|
||||||
'y_min': y_min,
|
|
||||||
'y_max': y_max
|
|
||||||
})
|
|
||||||
|
|
||||||
if len(predictions_found) > 0:
|
|
||||||
# add the prediction with the highest confidence
|
|
||||||
result['predictions'].append(max(predictions_found, key=lambda x: x['confidence']))
|
|
||||||
|
|
||||||
result['processMs'] = round((time.time() - interference) * 1000, 2)
|
|
||||||
result['inferenceMs'] = result['processMs'] # same as processMs
|
|
||||||
return jsonify(result)
|
|
||||||
else:
|
|
||||||
return jsonify({'error': 'Endpoint not implemented'}), 404
|
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
def index():
|
def index():
|
||||||
@ -228,7 +174,6 @@ def convert_to_cpai_compatible(result):
|
|||||||
|
|
||||||
if 'plates' in result:
|
if 'plates' in result:
|
||||||
plates = result['plates']
|
plates = result['plates']
|
||||||
|
|
||||||
for plate in plates:
|
for plate in plates:
|
||||||
warpedBox = plate['warpedBox']
|
warpedBox = plate['warpedBox']
|
||||||
x_coords = warpedBox[0::2]
|
x_coords = warpedBox[0::2]
|
||||||
@ -251,6 +196,51 @@ def convert_to_cpai_compatible(result):
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def find_best_plate_with_split(image, split_size=4, wanted_cells=None):
|
||||||
|
if wanted_cells is None:
|
||||||
|
wanted_cells = [5, 6, 7, 9, 10, 11, 14, 15] # TODO: use params not specifc to my use case
|
||||||
|
|
||||||
|
predictions_found = []
|
||||||
|
|
||||||
|
width, height = image.size
|
||||||
|
cell_width = width // split_size
|
||||||
|
cell_height = height // split_size
|
||||||
|
|
||||||
|
for cell_index in range(1, split_size * split_size + 1):
|
||||||
|
row = (cell_index - 1) // split_size
|
||||||
|
col = (cell_index - 1) % split_size
|
||||||
|
left = col * cell_width
|
||||||
|
upper = row * cell_height
|
||||||
|
right = left + cell_width
|
||||||
|
lower = upper + cell_height
|
||||||
|
|
||||||
|
if cell_index in wanted_cells:
|
||||||
|
cell_image = image.crop((left, upper, right, lower))
|
||||||
|
result_cell = json.loads(process_image(cell_image))
|
||||||
|
|
||||||
|
if 'plates' in result_cell:
|
||||||
|
for plate in result_cell['plates']:
|
||||||
|
warpedBox = plate['warpedBox']
|
||||||
|
x_coords = warpedBox[0::2]
|
||||||
|
y_coords = warpedBox[1::2]
|
||||||
|
x_min = min(x_coords) + left
|
||||||
|
x_max = max(x_coords) + left
|
||||||
|
y_min = min(y_coords) + upper
|
||||||
|
y_max = max(y_coords) + upper
|
||||||
|
|
||||||
|
predictions_found.append({
|
||||||
|
'confidence': plate['confidences'][0] / 100,
|
||||||
|
'label': "Plate: " + plate['text'],
|
||||||
|
'plate': plate['text'],
|
||||||
|
'x_min': x_min,
|
||||||
|
'x_max': x_max,
|
||||||
|
'y_min': y_min,
|
||||||
|
'y_max': y_max
|
||||||
|
})
|
||||||
|
|
||||||
|
return predictions_found
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
engine = threading.Thread(target=load_engine, daemon=True)
|
engine = threading.Thread(target=load_engine, daemon=True)
|
||||||
engine.start()
|
engine.start()
|
||||||
|
Loading…
Reference in New Issue
Block a user