-
Notifications
You must be signed in to change notification settings - Fork 4
/
tracker.py
80 lines (70 loc) · 3.59 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from src.motion_filter import MotionFilter
from src.frontend import Frontend
from src.backend import Backend
import torch
from colorama import Fore, Style
from multiprocessing.connection import Connection
from src.utils.datasets import BaseDataset
from src.utils.Printer import Printer,FontColor
class Tracker:
def __init__(self, slam, pipe:Connection):
self.cfg = slam.cfg
self.device = self.cfg['device']
self.net = slam.droid_net
self.video = slam.video
self.verbose = slam.verbose
self.pipe = pipe
self.only_tracking = slam.only_tracking
self.output = slam.output
# filter incoming frames so that there is enough motion
self.frontend_window = self.cfg['tracking']['frontend']['window']
filter_thresh = self.cfg['tracking']['motion_filter']['thresh']
self.motion_filter = MotionFilter(self.net, self.video, self.cfg, thresh=filter_thresh, device=self.device)
self.enable_online_ba = self.cfg['tracking']['frontend']['enable_online_ba']
self.every_kf = self.cfg['mapping']['every_keyframe']
# frontend process
self.frontend = Frontend(self.net, self.video, self.cfg)
self.online_ba = Backend(self.net,self.video, self.cfg)
self.ba_freq = self.cfg['tracking']['backend']['ba_freq']
self.printer:Printer = slam.printer
def run(self, stream:BaseDataset):
'''
Trigger the tracking process.
1. check whether there is enough motion between the current frame and last keyframe by motion_filter
2. use frontend to do local bundle adjustment, to estimate camera pose and depth image,
also delete the current keyframe if it is too close to the previous keyframe after local BA.
3. run online global BA periodically by backend
4. send the estimated pose and depth to mapper,
and wait until the mapper finish its current mapping optimization.
'''
prev_kf_idx = 0
curr_kf_idx = 0
prev_ba_idx = 0
number_of_kf = 0
intrinsic = stream.get_intrinsic()
# for (timestamp, image, _, _) in tqdm(stream):
for i in range(len(stream)):
timestamp, image, _, _ = stream[i]
with torch.no_grad():
### check there is enough motion
self.motion_filter.track(timestamp, image, intrinsic)
# local bundle adjustment
self.frontend()
curr_kf_idx = self.video.counter.value - 1
if curr_kf_idx != prev_kf_idx and self.frontend.is_initialized:
number_of_kf += 1
if self.enable_online_ba and curr_kf_idx >= prev_ba_idx + self.ba_freq:
# run online global BA every {self.ba_freq} keyframes
self.printer.print(f"Online BA at {curr_kf_idx}th keyframe, frame index: {timestamp}",FontColor.TRACKER)
self.online_ba.dense_ba(2)
prev_ba_idx = curr_kf_idx
if (not self.only_tracking) and (number_of_kf%self.every_kf==0):
# inform the mapper that the estimation of current pose and depth is finished
self.pipe.send({"is_keyframe":True, "video_idx":curr_kf_idx,
"timestamp":timestamp, "end":False})
self.pipe.recv()
prev_kf_idx = curr_kf_idx
self.printer.update_pbar()
if not self.only_tracking:
self.pipe.send({"is_keyframe":True, "video_idx":None,
"timestamp":None, "end":True})