forked from amymcgovern/pyparrot
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDroneManager.py
123 lines (108 loc) · 4.15 KB
/
DroneManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from pyparrot.Bebop import Bebop
from pyparrot.DroneVision import DroneVision
from pyparrot.Model import Model
import subprocess
import time
import psutil
class DroneManager:
def __init__(self):
self.bebop = Bebop()
self.sensor_data = {}
self.vision = None
self.ffplay_process = None
self.running = False
def connect(self):
"""
Connect to the Bebop drone.
"""
success = self.bebop.connect(5)
if success:
print("Successfully connected to Bebop.")
else:
print("Failed to connect to Bebop.")
return success
def connect_wifi(self, ssid, password=None):
"""
Connect to the drone's WiFi network on Windows.
:param ssid: The SSID of the WiFi network.
:param password: The password for the WiFi network, if any.
"""
try:
if password:
cmd = f"netsh wlan connect name={ssid} ssid={ssid} key={password}"
else:
cmd = f"netsh wlan connect name={ssid} ssid={ssid}"
result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
print(f"Successfully connected to {ssid}")
return True
else:
print(f"Failed to connect to {ssid}: {result.stderr.decode('utf-8')}")
return False
except subprocess.CalledProcessError as e:
print(f"Error connecting to {ssid}: {e}")
return False
def start_video_stream(self):
"""
Start the video stream using ffplay in a separate process.
"""
if self.connect():
self.vision = DroneVision(self.bebop, Model.BEBOP)
self.bebop.start_video_stream()
ffplay_cmd = "ffplay -fflags nobuffer -flags low_delay -protocol_whitelist file,rtp,udp -i pyparrot/utils/bebop.sdp"
self.ffplay_process = subprocess.Popen(ffplay_cmd, shell=True)
print("Vision successfully started!")
self.running = True
def print_sensor_data(self):
"""
Print sensor data in a readable format. Call this method in your main loop.
"""
if self.running:
sensors = self.bebop.sensors.sensors_dict
print("Sensor Data:")
for key, value in sensors.items():
label = key.replace("Changed_", " ").replace("_", " ")
print(f"{label}: {value}")
print("-" * 20) # Separator for readability
def stop(self):
"""
Stop the video stream and disconnect.
"""
if self.vision:
print("Stopping vision and disconnecting.")
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == 'ffplay.exe':
proc.terminate()
break # Assuming you want to terminate the first found instance
if self.ffplay_process:
self.ffplay_process.terminate()
self.vision.close_video()
self.bebop.stop_video_stream()
self.bebop.disconnect()
self.running = False
def update_sensor_data(self):
"""
Fetches the latest sensor data from the drone and updates the sensor_data dictionary.
"""
current_sensors = self.bebop.sensors.sensors_dict
for key, value in current_sensors.items():
# Process or transform the sensor data if necessary
self.sensor_data[key] = value
def angle_camera(self, tilt, pan):
"""
Adjust the camera angle of the drone.
:param angle: The angle to set the camera to.
"""
self.bebop.pan_tilt_camera(tilt, pan)
if __name__ == "__main__":
manager = DroneManager()
try:
# Start the video stream
manager.start_video_stream()
# Main loop
while manager.running:
time.sleep(0.1) # Sleep to prevent excessive CPU usage
manager.update_sensor_data()
except KeyboardInterrupt:
print("Interrupted by user, stopping...")
manager.stop()