Skip to content

Commit 0a7d48c

Browse files
authored
Merge pull request #139 from johnolafenwa/john/multithreading
PERF Improvement on multicore systems
2 parents d6f653e + 665f76c commit 0a7d48c

6 files changed

Lines changed: 622 additions & 505 deletions

File tree

intelligencelayer/shared/detection.py

Lines changed: 93 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import time
99
import warnings
1010
from multiprocessing import Process
11+
from threading import Thread
12+
from queue import Queue
1113

1214
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "."))
1315

@@ -33,112 +35,126 @@
3335

3436
opt = parser.parse_args()
3537

38+
MODE = SharedOptions.MODE
39+
SHARED_APP_DIR = SharedOptions.SHARED_APP_DIR
40+
CUDA_MODE = SharedOptions.CUDA_MODE
41+
db = SharedOptions.db
42+
TEMP_PATH = SharedOptions.TEMP_PATH
3643

37-
def objectdetection(thread_name: str, delay: float):
44+
if opt.name == None:
45+
IMAGE_QUEUE = "detection_queue"
46+
else:
47+
IMAGE_QUEUE = opt.name + "_queue"
3848

39-
MODE = SharedOptions.MODE
40-
SHARED_APP_DIR = SharedOptions.SHARED_APP_DIR
41-
CUDA_MODE = SharedOptions.CUDA_MODE
42-
db = SharedOptions.db
43-
TEMP_PATH = SharedOptions.TEMP_PATH
49+
if opt.model == None:
50+
model_path = os.path.join(
51+
SHARED_APP_DIR, SharedOptions.SETTINGS.DETECTION_MODEL
52+
)
53+
else:
54+
model_path = opt.model
4455

45-
if opt.name == None:
46-
IMAGE_QUEUE = "detection_queue"
47-
else:
48-
IMAGE_QUEUE = opt.name + "_queue"
56+
if MODE == "High":
4957

50-
if opt.model == None:
51-
model_path = os.path.join(
52-
SHARED_APP_DIR, SharedOptions.SETTINGS.DETECTION_MODEL
53-
)
54-
else:
55-
model_path = opt.model
58+
reso = SharedOptions.SETTINGS.DETECTION_HIGH
5659

57-
if MODE == "High":
60+
elif MODE == "Medium":
5861

59-
reso = SharedOptions.SETTINGS.DETECTION_HIGH
62+
reso = SharedOptions.SETTINGS.DETECTION_MEDIUM
6063

61-
elif MODE == "Medium":
64+
elif MODE == "Low":
6265

63-
reso = SharedOptions.SETTINGS.DETECTION_MEDIUM
66+
reso = SharedOptions.SETTINGS.DETECTION_LOW
6467

65-
elif MODE == "Low":
68+
detector = YOLODetector(model_path, reso, cuda=CUDA_MODE)
6669

67-
reso = SharedOptions.SETTINGS.DETECTION_LOW
68-
69-
detector = YOLODetector(model_path, reso, cuda=CUDA_MODE)
70+
def run_task(q):
7071
while True:
71-
queue = db.lrange(IMAGE_QUEUE, 0, 0)
72+
req_data = q.get()
7273

73-
db.ltrim(IMAGE_QUEUE, len(queue), -1)
74+
img_id = req_data["imgid"]
75+
req_id = req_data["reqid"]
76+
req_type = req_data["reqtype"]
77+
threshold = float(req_data["minconfidence"])
78+
img_path = os.path.join(TEMP_PATH, img_id)
7479

75-
if len(queue) > 0:
80+
try:
81+
det = detector.predict(img_path, threshold)
7682

77-
for req_data in queue:
83+
outputs = []
7884

79-
req_data = json.JSONDecoder().decode(req_data)
85+
for *xyxy, conf, cls in reversed(det):
86+
x_min = xyxy[0]
87+
y_min = xyxy[1]
88+
x_max = xyxy[2]
89+
y_max = xyxy[3]
90+
score = conf.item()
91+
92+
label = detector.names[int(cls.item())]
93+
94+
detection = {
95+
"confidence": score,
96+
"label": label,
97+
"x_min": int(x_min),
98+
"y_min": int(y_min),
99+
"x_max": int(x_max),
100+
"y_max": int(y_max),
101+
}
102+
103+
outputs.append(detection)
80104

81-
img_id = req_data["imgid"]
82-
req_id = req_data["reqid"]
83-
req_type = req_data["reqtype"]
84-
threshold = float(req_data["minconfidence"])
85-
img_path = os.path.join(TEMP_PATH, img_id)
105+
output = {"success": True, "predictions": outputs}
86106

87-
try:
88-
det = detector.predict(img_path, threshold)
107+
except UnidentifiedImageError:
108+
err_trace = traceback.format_exc()
109+
print(err_trace, file=sys.stderr, flush=True)
89110

90-
outputs = []
111+
output = {
112+
"success": False,
113+
"error": "invalid image file",
114+
"code": 400,
115+
}
91116

92-
for *xyxy, conf, cls in reversed(det):
93-
x_min = xyxy[0]
94-
y_min = xyxy[1]
95-
x_max = xyxy[2]
96-
y_max = xyxy[3]
97-
score = conf.item()
117+
except Exception:
98118

99-
label = detector.names[int(cls.item())]
119+
err_trace = traceback.format_exc()
120+
print(err_trace, file=sys.stderr, flush=True)
100121

101-
detection = {
102-
"confidence": score,
103-
"label": label,
104-
"x_min": int(x_min),
105-
"y_min": int(y_min),
106-
"x_max": int(x_max),
107-
"y_max": int(y_max),
108-
}
122+
output = {
123+
"success": False,
124+
"error": "error occured on the server",
125+
"code": 500,
126+
}
109127

110-
outputs.append(detection)
128+
finally:
129+
db.set(req_id, json.dumps(output))
130+
if os.path.exists(img_path):
131+
os.remove(img_path)
111132

112-
output = {"success": True, "predictions": outputs}
113133

114-
except UnidentifiedImageError:
115-
err_trace = traceback.format_exc()
116-
print(err_trace, file=sys.stderr, flush=True)
134+
def objectdetection(delay: float):
117135

118-
output = {
119-
"success": False,
120-
"error": "invalid image file",
121-
"code": 400,
122-
}
136+
q = Queue(maxsize=0)
123137

124-
except Exception:
138+
for _ in range(SharedOptions.THREADCOUNT):
139+
worker = Thread(target=run_task, args=(q,))
140+
worker.setDaemon(True)
141+
worker.start()
125142

126-
err_trace = traceback.format_exc()
127-
print(err_trace, file=sys.stderr, flush=True)
143+
while True:
144+
queue = db.lrange(IMAGE_QUEUE, 0, 0)
128145

129-
output = {
130-
"success": False,
131-
"error": "error occured on the server",
132-
"code": 500,
133-
}
146+
db.ltrim(IMAGE_QUEUE, len(queue), -1)
147+
148+
if len(queue) > 0:
149+
150+
for req_data in queue:
151+
152+
req_data = json.JSONDecoder().decode(req_data)
134153

135-
finally:
136-
db.set(req_id, json.dumps(output))
137-
if os.path.exists(img_path):
138-
os.remove(img_path)
154+
q.put(req_data)
139155

140156
time.sleep(delay)
141-
if __name__ == "__main__":
142-
p = Process(target=objectdetection, args=("", SharedOptions.SLEEP_TIME))
143-
p.start()
157+
if __name__ == "__main__":
158+
objectdetection(SharedOptions.SLEEP_TIME)
159+
144160

0 commit comments

Comments
 (0)