|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +import cv2 |
| 4 | +import os |
| 5 | +import sys, getopt |
| 6 | +import signal |
| 7 | +import time |
| 8 | +from edge_impulse_linux.image import ImageImpulseRunner |
| 9 | + |
| 10 | +runner = None |
| 11 | +show_camera = False |
| 12 | + |
| 13 | +def now(): |
| 14 | + return round(time.time() * 1000) |
| 15 | + |
| 16 | +def get_webcams(): |
| 17 | + port_ids = [] |
| 18 | + for port in range(5): |
| 19 | + print("Looking for a camera in port %s:" %port) |
| 20 | + camera = cv2.VideoCapture(port) |
| 21 | + if camera.isOpened(): |
| 22 | + ret = camera.read()[0] |
| 23 | + if ret: |
| 24 | + backendName =camera.getBackendName() |
| 25 | + w = camera.get(3) |
| 26 | + h = camera.get(4) |
| 27 | + print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port)) |
| 28 | + port_ids.append(port) |
| 29 | + camera.release() |
| 30 | + return port_ids |
| 31 | + |
| 32 | +def sigint_handler(sig, frame): |
| 33 | + print('Interrupted') |
| 34 | + if (runner): |
| 35 | + runner.stop() |
| 36 | + sys.exit(0) |
| 37 | + |
| 38 | +signal.signal(signal.SIGINT, sigint_handler) |
| 39 | + |
| 40 | +def help(): |
| 41 | + print('python classify.py <path_to_model.eim> <Camera port ID, only required when more than 1 camera is present>') |
| 42 | + |
| 43 | +def main(argv): |
| 44 | + try: |
| 45 | + opts, args = getopt.getopt(argv, "h", ["--help"]) |
| 46 | + except getopt.GetoptError: |
| 47 | + help() |
| 48 | + sys.exit(2) |
| 49 | + |
| 50 | + for opt, arg in opts: |
| 51 | + if opt in ('-h', '--help'): |
| 52 | + help() |
| 53 | + sys.exit() |
| 54 | + |
| 55 | + if len(args) == 0: |
| 56 | + help() |
| 57 | + sys.exit(2) |
| 58 | + |
| 59 | + model = args[0] |
| 60 | + |
| 61 | + dir_path = os.path.dirname(os.path.realpath(__file__)) |
| 62 | + modelfile = os.path.join(dir_path, model) |
| 63 | + |
| 64 | + print('MODEL: ' + modelfile) |
| 65 | + |
| 66 | + with ImageImpulseRunner(modelfile) as runner: |
| 67 | + try: |
| 68 | + model_info = runner.init() |
| 69 | + print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"') |
| 70 | + labels = model_info['model_parameters']['labels'] |
| 71 | + if len(args)>= 2: |
| 72 | + videoCaptureDeviceId = int(args[1]) |
| 73 | + else: |
| 74 | + port_ids = get_webcams() |
| 75 | + if len(port_ids) == 0: |
| 76 | + raise Exception('Cannot find any webcams') |
| 77 | + if len(args)<= 1 and len(port_ids)> 1: |
| 78 | + raise Exception("Multiple cameras found. Add the camera port ID as a second argument to use to this script") |
| 79 | + videoCaptureDeviceId = int(port_ids[0]) |
| 80 | + |
| 81 | + camera = cv2.VideoCapture(videoCaptureDeviceId) |
| 82 | + ret = camera.read()[0] |
| 83 | + if ret: |
| 84 | + backendName = camera.getBackendName() |
| 85 | + w = camera.get(3) |
| 86 | + h = camera.get(4) |
| 87 | + print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId)) |
| 88 | + camera.release() |
| 89 | + else: |
| 90 | + raise Exception("Couldn't initialize selected camera.") |
| 91 | + |
| 92 | + next_frame = 0 # limit to ~10 fps here |
| 93 | + |
| 94 | + for img in runner.get_frames(videoCaptureDeviceId): |
| 95 | + if (next_frame > now()): |
| 96 | + time.sleep((next_frame - now()) / 1000) |
| 97 | + |
| 98 | + # make two cuts from the image, one on the left and one on the right |
| 99 | + features_l, cropped_l = runner.get_features_from_image(img, 'left') |
| 100 | + features_r, cropped_r = runner.get_features_from_image(img, 'right') |
| 101 | + |
| 102 | + # classify both |
| 103 | + res_l = runner.classify(features_l) |
| 104 | + res_r = runner.classify(features_r) |
| 105 | + |
| 106 | + cv2.imwrite('debug_l.jpg', cropped_l) |
| 107 | + cv2.imwrite('debug_r.jpg', cropped_r) |
| 108 | + |
| 109 | + def print_classification(res, tag): |
| 110 | + if "classification" in res["result"].keys(): |
| 111 | + print('%s: Result (%d ms.) ' % (tag, res['timing']['dsp'] + res['timing']['classification']), end='') |
| 112 | + for label in labels: |
| 113 | + score = res['result']['classification'][label] |
| 114 | + print('%s: %.2f\t' % (label, score), end='') |
| 115 | + print('', flush=True) |
| 116 | + elif "bounding_boxes" in res["result"].keys(): |
| 117 | + print('%s: Found %d bounding boxes (%d ms.)' % (tag, len(res["result"]["bounding_boxes"]), res['timing']['dsp'] + res['timing']['classification'])) |
| 118 | + for bb in res["result"]["bounding_boxes"]: |
| 119 | + print('\t%s (%.2f): x=%d y=%d w=%d h=%d' % (bb['label'], bb['value'], bb['x'], bb['y'], bb['width'], bb['height'])) |
| 120 | + |
| 121 | + print_classification(res_l, 'LEFT') |
| 122 | + print_classification(res_r, 'RIGHT') |
| 123 | + |
| 124 | + next_frame = now() + 100 |
| 125 | + |
| 126 | + finally: |
| 127 | + if (runner): |
| 128 | + runner.stop() |
| 129 | + |
| 130 | +if __name__ == "__main__": |
| 131 | + main(sys.argv[1:]) |
0 commit comments