-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtracker.py
201 lines (170 loc) · 8.32 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import cv2
import torch
import numpy as np
import math
from deep_sort.utils.parser import get_config
from deep_sort.deep_sort import DeepSort
import hyperlpr3 as lpr3
cfg = get_config()
cfg.merge_from_file("../GUI/deep_sort/configs/deep_sort.yaml")
deepsort = DeepSort(cfg.DEEPSORT.REID_CKPT,
max_dist=cfg.DEEPSORT.MAX_DIST, min_confidence=cfg.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET,
use_cuda=True)
def draw_bboxes(i, image, bboxes, speed, line_thickness):
catcher = lpr3.LicensePlateCatcher(detect_level=lpr3.DETECT_LEVEL_HIGH)
with open(r"./result/num_" + str(i) + ".txt", 'w') as f:
f.write('Num:\t' + str(len(bboxes)) + '\n')
line_thickness = line_thickness or round(
0.002 * (image.shape[0] + image.shape[1]) * 0.5) + 1
list_pts = []
point_radius = 4
index = 0
for (x1, y1, x2, y2, cls_id, pos_id) in bboxes:
if len(speed) > 0:
speed_id = speed[index]
else:
speed_id = 0
# print("speed: ", speed)
# print("speed_id: ", speed_id)
index += 1
if index >= len(speed):
index -= 1
color = (0, 255, 0)
# 撞线的点
check_point_x = x1
check_point_y = int(y1 + ((y2 - y1) * 0.6))
c1, c2 = (x1, y1), (x2, y2)
cv2.rectangle(image, c1, c2, color, thickness=line_thickness, lineType=cv2.LINE_AA)
font_thickness = max(line_thickness - 1, 1)
t_size = cv2.getTextSize(cls_id, 0, fontScale=line_thickness / 3, thickness=font_thickness)[0]
c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
cv2.rectangle(image, c1, c2, color, -1, cv2.LINE_AA) # filled
if cls_id == 'person' or cls_id == 'bicycle' or cls_id == 'motorbicycle':
cv2.putText(image, '{} ID-{}'.format(cls_id, pos_id), (c1[0], c1[1] - 2), 0,
line_thickness / 3,
[225, 255, 255], thickness=font_thickness, lineType=cv2.LINE_AA)
else:
cv2.putText(image, '{} ID-{},{}km/h'.format(cls_id, pos_id, speed_id), (c1[0], c1[1] - 2), 0, line_thickness / 3,
[225, 255, 255], thickness=font_thickness, lineType=cv2.LINE_AA)
# print("ssss: ", pos_id, speed_id)
# results = catcher(image[x1:y1,x2:y2])
# print('MMMMM: ',x1,y1,x2,y2)
# cv2.imshow('car',image[y1:y2,x1:x2])
results = catcher(image[y1:y2,x1:x2])
# print('SSSS: ',results)
if results != []:
with open(r"./result/num_" + str(i) + ".txt", 'a') as f:
f.write('ID' + str(pos_id) + ':\t' + cls_id + '\t' + str(speed_id) + '\t' + results[0][0] + '\t' + str(results[0][1]) + '\n')
else:
with open(r"./result/num_" + str(i) + ".txt", 'a') as f:
f.write('ID' + str(pos_id) + ':\t' + cls_id + '\t' + str(speed_id) + '\t' + '0' + '\t' + '0' + '\n')
# with open(r"./result/num_" + str(i) + ".txt", 'a') as f:
# f.write('ID' + str(pos_id) + '\n')
list_pts.append([check_point_x - point_radius, check_point_y - point_radius])
list_pts.append([check_point_x - point_radius, check_point_y + point_radius])
list_pts.append([check_point_x + point_radius, check_point_y + point_radius])
list_pts.append([check_point_x + point_radius, check_point_y - point_radius])
ndarray_pts = np.array(list_pts, np.int32)
cv2.fillPoly(image, [ndarray_pts], color=(0, 0, 255))
list_pts.clear()
return image
def update(bboxes, image):
bbox_xywh = []
confs = []
bboxes2draw = []
if len(bboxes) > 0:
for x1, y1, x2, y2, lbl, conf in bboxes:
obj = [
int((x1 + x2) * 0.5), int((y1 + y2) * 0.5),
x2 - x1, y2 - y1
]
bbox_xywh.append(obj)
confs.append(conf)
xywhs = torch.Tensor(bbox_xywh)
confss = torch.Tensor(confs)
outputs = deepsort.update(xywhs, confss, image)
for x1, y1, x2, y2, track_id in list(outputs):
# x1, y1, x2, y2, track_id = value
center_x = (x1 + x2) * 0.5
center_y = (y1 + y2) * 0.5
label = search_label(center_x=center_x, center_y=center_y,
bboxes_xyxy=bboxes, max_dist_threshold=20.0)
bboxes2draw.append((x1, y1, x2, y2, label, track_id))
pass
pass
return bboxes2draw
def search_label(center_x, center_y, bboxes_xyxy, max_dist_threshold):
"""
在 yolov5 的 bbox 中搜索中心点最接近的label
:param center_x:
:param center_y:
:param bboxes_xyxy:
:param max_dist_threshold:
:return: 字符串
"""
label = ''
# min_label = ''
min_dist = -1.0
for x1, y1, x2, y2, lbl, conf in bboxes_xyxy:
center_x2 = (x1 + x2) * 0.5
center_y2 = (y1 + y2) * 0.5
# 横纵距离都小于 max_dist
min_x = abs(center_x2 - center_x)
min_y = abs(center_y2 - center_y)
if min_x < max_dist_threshold and min_y < max_dist_threshold:
# 距离阈值,判断是否在允许误差范围内
# 取 x, y 方向上的距离平均值
avg_dist = (min_x + min_y) * 0.5
if min_dist == -1.0:
# 第一次赋值
min_dist = avg_dist
# 赋值label
label = lbl
pass
else:
# 若不是第一次,则距离小的优先
if avg_dist < min_dist:
min_dist = avg_dist
# label
label = lbl
pass
pass
pass
return label
def Estimated_speed(locations, fps, width): # 基于 deepssort 的车速检测
print("loca ", locations)
present_IDs = []
prev_IDs = []
work_IDs = []
work_IDs_index = []
work_IDs_prev_index = []
work_locations = [] # 当前帧数据:中心点x坐标、中心点y坐标、目标序号、车辆类别、车辆像素宽度
work_prev_locations = [] # 上一帧数据,数据格式相同
speed = []
for i in range(len(locations[1])):
present_IDs.append(locations[1][i][5]) # 获得当前帧中跟踪到车辆的ID locations[1][i][1] label locations[1][i][2] ID
for i in range(len(locations[0])):
prev_IDs.append(locations[0][i][5]) # 获得前一帧中跟踪到车辆的ID
for m, n in enumerate(present_IDs):
if n in prev_IDs: # 进行筛选,找到在两帧图像中均被检测到的有效车辆ID,存入work_IDs中
work_IDs.append(n)
work_IDs_index.append(m)
for x in work_IDs_index: # 将当前帧有效检测车辆的信息存入work_locations中
work_locations.append(locations[1][x])
for y, z in enumerate(prev_IDs):
if z in work_IDs: # 将前一帧有效检测车辆的ID索引存入work_IDs_prev_index中
work_IDs_prev_index.append(y)
for x in work_IDs_prev_index: # 将前一帧有效检测车辆的信息存入work_prev_locations中
work_prev_locations.append(locations[0][x])
print("work_locations: ", work_locations)
print("work_prev_locations: ", work_prev_locations)
for i in range(len(work_IDs)):
ss = ((math.sqrt((work_locations[i][0] - work_prev_locations[i][0]) ** 2 + # 计算有效检测车辆的速度,采用线性的从像素距离到真实空间距离的映射
(work_locations[i][1] - work_prev_locations[i][1]) ** 2) * # 当视频拍摄视角并不垂直于车辆移动轨迹时,测算出来的速度将比实际速度低
500 / (work_locations[i][3]) * fps / 5 * 3.6 * 2))
speed.append(math.sqrt(ss)*1.5) # [work_locations[i][3]]
for i in range(len(speed)):
speed[i] = round(speed[i], 1) #work_locations[i][2]] # 将保留一位小数的单位为km/h的车辆速度及其ID存入speed二维列表中
return speed