forked from spehj/yolov7-counter-jetson-nano
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo_threading.py
65 lines (52 loc) · 2.02 KB
/
demo_threading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import threading
import cv2
import time
from queue import Queue
class InferThread(threading.Thread):
def __init__(self, frame_queue, max_queue_size):
super().__init__()
self.frame_queue = frame_queue
self.max_queue_size = max_queue_size
def run(self):
cap = cv2.VideoCapture('pigs-trimmed-h264-1080p.mov')
while True:
if self.frame_queue.qsize() < self.max_queue_size:
ret, frame = cap.read()
print("Reading ...")
if not ret:
break
# This code simulates preprocessing and inference
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
self.frame_queue.put(gray_frame) # Add frame to the queue
time.sleep(0.1)
#################################################
else:
print("SLEEPING")
time.sleep(0.1) # Wait if queue is full
cap.release()
class DisplayThread(threading.Thread):
def __init__(self, frame_queue):
super().__init__()
self.frame_queue = frame_queue
def run(self):
while True:
frame = self.frame_queue.get() # Get frame from the queue
# This code simulates postprocessing, tracking and inference
blurred_frame = cv2.GaussianBlur(frame, (5, 5), 0)
time.sleep(0.3)
############################################################
print("Displaying ...")
cv2.imshow('Display', blurred_frame)
cv2.waitKey(1)
self.frame_queue.task_done()
if __name__ == '__main__':
max_queue_size = 10 # Maximum size of the frame queue
frame_queue = Queue(maxsize=max_queue_size)
infer_thread = InferThread(frame_queue, max_queue_size)
display_thread = DisplayThread(frame_queue)
infer_thread.start()
display_thread.start()
infer_thread.join()
frame_queue.join()
display_thread.join()
cv2.destroyAllWindows()