Skip to content

Commit

Permalink
added automatic control example
Browse files Browse the repository at this point in the history
  • Loading branch information
DerrickXuNu committed Mar 8, 2021
1 parent 7173478 commit 884b6eb
Show file tree
Hide file tree
Showing 16 changed files with 2,420 additions and 0 deletions.
Empty file added python/__init__.py
Empty file.
Empty file added python/agents/__init__.py
Empty file.
Empty file.
234 changes: 234 additions & 0 deletions python/agents/navigation/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.

""" This module implements an agent that roams around a track following random
waypoints and avoiding other vehicles.
The agent also responds to traffic lights. """

import sys
import math

from enum import Enum

import carla
from agents.tools.misc import is_within_distance_ahead, is_within_distance, compute_distance


class AgentState(Enum):
"""
AGENT_STATE represents the possible states of a roaming agent
"""
NAVIGATING = 1
BLOCKED_BY_VEHICLE = 2
BLOCKED_RED_LIGHT = 3


class Agent(object):
"""Base class to define agents in CARLA"""

def __init__(self, vehicle):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
"""
self._vehicle = vehicle
self._proximity_tlight_threshold = 5.0 # meters
self._proximity_vehicle_threshold = 10.0 # meters
self._local_planner = None
self._world = self._vehicle.get_world()
try:
self._map = self._world.get_map()
except RuntimeError as error:
print('RuntimeError: {}'.format(error))
print(' The server could not send the OpenDRIVE (.xodr) file:')
print(' Make sure it exists, has the same name of your town, and is correct.')
sys.exit(1)
self._last_traffic_light = None

def get_local_planner(self):
"""Get method for protected member local planner"""
return self._local_planner

@staticmethod
def run_step(debug=False):
"""
Execute one step of navigation.
:param debug: boolean flag for debugging
:return: control
"""
control = carla.VehicleControl()

if debug:
control.steer = 0.0
control.throttle = 0.0
control.brake = 0.0
control.hand_brake = False
control.manual_gear_shift = False

return control

def _is_light_red(self, lights_list):
"""
Method to check if there is a red light affecting us. This version of
the method is compatible with both European and US style traffic lights.
:param lights_list: list containing TrafficLight objects
:return: a tuple given by (bool_flag, traffic_light), where
- bool_flag is True if there is a traffic light in RED
affecting us and False otherwise
- traffic_light is the object itself or None if there is no
red traffic light affecting us
"""
ego_vehicle_location = self._vehicle.get_location()
ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location)

for traffic_light in lights_list:
object_location = self._get_trafficlight_trigger_location(traffic_light)
object_waypoint = self._map.get_waypoint(object_location)

if object_waypoint.road_id != ego_vehicle_waypoint.road_id:
continue

ve_dir = ego_vehicle_waypoint.transform.get_forward_vector()
wp_dir = object_waypoint.transform.get_forward_vector()
dot_ve_wp = ve_dir.x * wp_dir.x + ve_dir.y * wp_dir.y + ve_dir.z * wp_dir.z

if dot_ve_wp < 0:
continue

if is_within_distance_ahead(object_waypoint.transform,
self._vehicle.get_transform(),
self._proximity_tlight_threshold):
if traffic_light.state == carla.TrafficLightState.Red:
return (True, traffic_light)

return (False, None)

def _get_trafficlight_trigger_location(self, traffic_light): # pylint: disable=no-self-use
"""
Calculates the yaw of the waypoint that represents the trigger volume of the traffic light
"""

def rotate_point(point, radians):
"""
rotate a given point by a given angle
"""
rotated_x = math.cos(radians) * point.x - math.sin(radians) * point.y
rotated_y = math.sin(radians) * point.x - math.cos(radians) * point.y

return carla.Vector3D(rotated_x, rotated_y, point.z)

base_transform = traffic_light.get_transform()
base_rot = base_transform.rotation.yaw
area_loc = base_transform.transform(traffic_light.trigger_volume.location)
area_ext = traffic_light.trigger_volume.extent

point = rotate_point(carla.Vector3D(0, 0, area_ext.z), math.radians(base_rot))
point_location = area_loc + carla.Location(x=point.x, y=point.y)

return carla.Location(point_location.x, point_location.y, point_location.z)

def _bh_is_vehicle_hazard(self, ego_wpt, ego_loc, vehicle_list,
proximity_th, up_angle_th, low_angle_th=0, lane_offset=0):
"""
Check if a given vehicle is an obstacle in our way. To this end we take
into account the road and lane the target vehicle is on and run a
geometry test to check if the target vehicle is under a certain distance
in front of our ego vehicle. We also check the next waypoint, just to be
sure there's not a sudden road id change.
WARNING: This method is an approximation that could fail for very large
vehicles, which center is actually on a different lane but their
extension falls within the ego vehicle lane. Also, make sure to remove
the ego vehicle from the list. Lane offset is set to +1 for right lanes
and -1 for left lanes, but this has to be inverted if lane values are
negative.
:param ego_wpt: waypoint of ego-vehicle
:param ego_log: location of ego-vehicle
:param vehicle_list: list of potential obstacle to check
:param proximity_th: threshold for the agent to be alerted of
a possible collision
:param up_angle_th: upper threshold for angle
:param low_angle_th: lower threshold for angle
:param lane_offset: for right and left lane changes
:return: a tuple given by (bool_flag, vehicle, distance), where:
- bool_flag is True if there is a vehicle ahead blocking us
and False otherwise
- vehicle is the blocker object itself
- distance is the meters separating the two vehicles
"""

# Get the right offset
if ego_wpt.lane_id < 0 and lane_offset != 0:
lane_offset *= -1

for target_vehicle in vehicle_list:

target_vehicle_loc = target_vehicle.get_location()
# If the object is not in our next or current lane it's not an obstacle

target_wpt = self._map.get_waypoint(target_vehicle_loc)
if target_wpt.road_id != ego_wpt.road_id or \
target_wpt.lane_id != ego_wpt.lane_id + lane_offset:
next_wpt = self._local_planner.get_incoming_waypoint_and_direction(steps=5)[0]
if target_wpt.road_id != next_wpt.road_id or \
target_wpt.lane_id != next_wpt.lane_id + lane_offset:
continue

if is_within_distance(target_vehicle_loc, ego_loc,
self._vehicle.get_transform().rotation.yaw,
proximity_th, up_angle_th, low_angle_th):
return (True, target_vehicle, compute_distance(target_vehicle_loc, ego_loc))

return (False, None, -1)

def _is_vehicle_hazard(self, vehicle_list):
"""
:param vehicle_list: list of potential obstacle to check
:return: a tuple given by (bool_flag, vehicle), where
- bool_flag is True if there is a vehicle ahead blocking us
and False otherwise
- vehicle is the blocker object itself
"""

ego_vehicle_location = self._vehicle.get_location()
ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location)

for target_vehicle in vehicle_list:
# do not account for the ego vehicle
if target_vehicle.id == self._vehicle.id:
continue

# if the object is not in our lane it's not an obstacle
target_vehicle_waypoint = self._map.get_waypoint(target_vehicle.get_location())
if target_vehicle_waypoint.road_id != ego_vehicle_waypoint.road_id or \
target_vehicle_waypoint.lane_id != ego_vehicle_waypoint.lane_id:
continue

if is_within_distance_ahead(target_vehicle.get_transform(),
self._vehicle.get_transform(),
self._proximity_vehicle_threshold):
return (True, target_vehicle)

return (False, None)

@staticmethod
def emergency_stop():
"""
Send an emergency stop command to the vehicle
:return: control for braking
"""
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 1.0
control.hand_brake = False

return control
128 changes: 128 additions & 0 deletions python/agents/navigation/basic_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.

""" This module implements an agent that roams around a track following random
waypoints and avoiding other vehicles.
The agent also responds to traffic lights. """

import carla
from agents.navigation.agent import Agent, AgentState
from agents.navigation.local_planner import LocalPlanner
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO


class BasicAgent(Agent):
"""
BasicAgent implements a basic agent that navigates scenes to reach a given
target destination. This agent respects traffic lights and other vehicles.
"""

def __init__(self, vehicle, target_speed=20):
"""
:param vehicle: actor to apply to local planner logic onto
"""
super(BasicAgent, self).__init__(vehicle)

self._proximity_tlight_threshold = 5.0 # meters
self._proximity_vehicle_threshold = 10.0 # meters
self._state = AgentState.NAVIGATING
args_lateral_dict = {
'K_P': 1,
'K_D': 0.4,
'K_I': 0,
'dt': 1.0 / 20.0}
self._local_planner = LocalPlanner(
self._vehicle, opt_dict={'target_speed': target_speed,
'lateral_control_dict': args_lateral_dict})
self._hop_resolution = 2.0
self._path_seperation_hop = 2
self._path_seperation_threshold = 0.5
self._target_speed = target_speed
self._grp = None

def set_destination(self, location):
"""
This method creates a list of waypoints from agent's position to destination location
based on the route returned by the global router
"""

start_waypoint = self._map.get_waypoint(self._vehicle.get_location())
end_waypoint = self._map.get_waypoint(
carla.Location(location[0], location[1], location[2]))

route_trace = self._trace_route(start_waypoint, end_waypoint)

self._local_planner.set_global_plan(route_trace)

def _trace_route(self, start_waypoint, end_waypoint):
"""
This method sets up a global router and returns the optimal route
from start_waypoint to end_waypoint
"""

# Setting up global router
if self._grp is None:
dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._hop_resolution)
grp = GlobalRoutePlanner(dao)
grp.setup()
self._grp = grp

# Obtain route plan
route = self._grp.trace_route(
start_waypoint.transform.location,
end_waypoint.transform.location)

return route

def run_step(self, debug=False):
"""
Execute one step of navigation.
:return: carla.VehicleControl
"""

# is there an obstacle in front of us?
hazard_detected = False

# retrieve relevant elements for safe navigation, i.e.: traffic lights
# and other vehicles
actor_list = self._world.get_actors()
vehicle_list = actor_list.filter("*vehicle*")
lights_list = actor_list.filter("*traffic_light*")

# check possible obstacles
vehicle_state, vehicle = self._is_vehicle_hazard(vehicle_list)
if vehicle_state:
if debug:
print('!!! VEHICLE BLOCKING AHEAD [{}])'.format(vehicle.id))

self._state = AgentState.BLOCKED_BY_VEHICLE
hazard_detected = True

# check for the state of the traffic lights
light_state, traffic_light = self._is_light_red(lights_list)
if light_state:
if debug:
print('=== RED LIGHT AHEAD [{}])'.format(traffic_light.id))

self._state = AgentState.BLOCKED_RED_LIGHT
hazard_detected = True

if hazard_detected:
control = self.emergency_stop()
else:
self._state = AgentState.NAVIGATING
# standard local planner behavior
control = self._local_planner.run_step(debug=debug)

return control

def done(self):
"""
Check whether the agent has reached its destination.
:return bool
"""
return self._local_planner.done()
Loading

0 comments on commit 884b6eb

Please sign in to comment.