diff --git a/python/__init__.py b/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/agents/__init__.py b/python/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/agents/navigation/__init__.py b/python/agents/navigation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/agents/navigation/agent.py b/python/agents/navigation/agent.py new file mode 100644 index 0000000..bb580ff --- /dev/null +++ b/python/agents/navigation/agent.py @@ -0,0 +1,234 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module implements an agent that roams around a track following random +waypoints and avoiding other vehicles. +The agent also responds to traffic lights. """ + +import sys +import math + +from enum import Enum + +import carla +from agents.tools.misc import is_within_distance_ahead, is_within_distance, compute_distance + + +class AgentState(Enum): + """ + AGENT_STATE represents the possible states of a roaming agent + """ + NAVIGATING = 1 + BLOCKED_BY_VEHICLE = 2 + BLOCKED_RED_LIGHT = 3 + + +class Agent(object): + """Base class to define agents in CARLA""" + + def __init__(self, vehicle): + """ + Constructor method. + + :param vehicle: actor to apply to local planner logic onto + """ + self._vehicle = vehicle + self._proximity_tlight_threshold = 5.0 # meters + self._proximity_vehicle_threshold = 10.0 # meters + self._local_planner = None + self._world = self._vehicle.get_world() + try: + self._map = self._world.get_map() + except RuntimeError as error: + print('RuntimeError: {}'.format(error)) + print(' The server could not send the OpenDRIVE (.xodr) file:') + print(' Make sure it exists, has the same name of your town, and is correct.') + sys.exit(1) + self._last_traffic_light = None + + def get_local_planner(self): + """Get method for protected member local planner""" + return self._local_planner + + @staticmethod + def run_step(debug=False): + """ + Execute one step of navigation. + + :param debug: boolean flag for debugging + :return: control + """ + control = carla.VehicleControl() + + if debug: + control.steer = 0.0 + control.throttle = 0.0 + control.brake = 0.0 + control.hand_brake = False + control.manual_gear_shift = False + + return control + + def _is_light_red(self, lights_list): + """ + Method to check if there is a red light affecting us. This version of + the method is compatible with both European and US style traffic lights. + + :param lights_list: list containing TrafficLight objects + :return: a tuple given by (bool_flag, traffic_light), where + - bool_flag is True if there is a traffic light in RED + affecting us and False otherwise + - traffic_light is the object itself or None if there is no + red traffic light affecting us + """ + ego_vehicle_location = self._vehicle.get_location() + ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location) + + for traffic_light in lights_list: + object_location = self._get_trafficlight_trigger_location(traffic_light) + object_waypoint = self._map.get_waypoint(object_location) + + if object_waypoint.road_id != ego_vehicle_waypoint.road_id: + continue + + ve_dir = ego_vehicle_waypoint.transform.get_forward_vector() + wp_dir = object_waypoint.transform.get_forward_vector() + dot_ve_wp = ve_dir.x * wp_dir.x + ve_dir.y * wp_dir.y + ve_dir.z * wp_dir.z + + if dot_ve_wp < 0: + continue + + if is_within_distance_ahead(object_waypoint.transform, + self._vehicle.get_transform(), + self._proximity_tlight_threshold): + if traffic_light.state == carla.TrafficLightState.Red: + return (True, traffic_light) + + return (False, None) + + def _get_trafficlight_trigger_location(self, traffic_light): # pylint: disable=no-self-use + """ + Calculates the yaw of the waypoint that represents the trigger volume of the traffic light + """ + + def rotate_point(point, radians): + """ + rotate a given point by a given angle + """ + rotated_x = math.cos(radians) * point.x - math.sin(radians) * point.y + rotated_y = math.sin(radians) * point.x - math.cos(radians) * point.y + + return carla.Vector3D(rotated_x, rotated_y, point.z) + + base_transform = traffic_light.get_transform() + base_rot = base_transform.rotation.yaw + area_loc = base_transform.transform(traffic_light.trigger_volume.location) + area_ext = traffic_light.trigger_volume.extent + + point = rotate_point(carla.Vector3D(0, 0, area_ext.z), math.radians(base_rot)) + point_location = area_loc + carla.Location(x=point.x, y=point.y) + + return carla.Location(point_location.x, point_location.y, point_location.z) + + def _bh_is_vehicle_hazard(self, ego_wpt, ego_loc, vehicle_list, + proximity_th, up_angle_th, low_angle_th=0, lane_offset=0): + """ + Check if a given vehicle is an obstacle in our way. To this end we take + into account the road and lane the target vehicle is on and run a + geometry test to check if the target vehicle is under a certain distance + in front of our ego vehicle. We also check the next waypoint, just to be + sure there's not a sudden road id change. + + WARNING: This method is an approximation that could fail for very large + vehicles, which center is actually on a different lane but their + extension falls within the ego vehicle lane. Also, make sure to remove + the ego vehicle from the list. Lane offset is set to +1 for right lanes + and -1 for left lanes, but this has to be inverted if lane values are + negative. + + :param ego_wpt: waypoint of ego-vehicle + :param ego_log: location of ego-vehicle + :param vehicle_list: list of potential obstacle to check + :param proximity_th: threshold for the agent to be alerted of + a possible collision + :param up_angle_th: upper threshold for angle + :param low_angle_th: lower threshold for angle + :param lane_offset: for right and left lane changes + :return: a tuple given by (bool_flag, vehicle, distance), where: + - bool_flag is True if there is a vehicle ahead blocking us + and False otherwise + - vehicle is the blocker object itself + - distance is the meters separating the two vehicles + """ + + # Get the right offset + if ego_wpt.lane_id < 0 and lane_offset != 0: + lane_offset *= -1 + + for target_vehicle in vehicle_list: + + target_vehicle_loc = target_vehicle.get_location() + # If the object is not in our next or current lane it's not an obstacle + + target_wpt = self._map.get_waypoint(target_vehicle_loc) + if target_wpt.road_id != ego_wpt.road_id or \ + target_wpt.lane_id != ego_wpt.lane_id + lane_offset: + next_wpt = self._local_planner.get_incoming_waypoint_and_direction(steps=5)[0] + if target_wpt.road_id != next_wpt.road_id or \ + target_wpt.lane_id != next_wpt.lane_id + lane_offset: + continue + + if is_within_distance(target_vehicle_loc, ego_loc, + self._vehicle.get_transform().rotation.yaw, + proximity_th, up_angle_th, low_angle_th): + return (True, target_vehicle, compute_distance(target_vehicle_loc, ego_loc)) + + return (False, None, -1) + + def _is_vehicle_hazard(self, vehicle_list): + """ + + :param vehicle_list: list of potential obstacle to check + :return: a tuple given by (bool_flag, vehicle), where + - bool_flag is True if there is a vehicle ahead blocking us + and False otherwise + - vehicle is the blocker object itself + """ + + ego_vehicle_location = self._vehicle.get_location() + ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location) + + for target_vehicle in vehicle_list: + # do not account for the ego vehicle + if target_vehicle.id == self._vehicle.id: + continue + + # if the object is not in our lane it's not an obstacle + target_vehicle_waypoint = self._map.get_waypoint(target_vehicle.get_location()) + if target_vehicle_waypoint.road_id != ego_vehicle_waypoint.road_id or \ + target_vehicle_waypoint.lane_id != ego_vehicle_waypoint.lane_id: + continue + + if is_within_distance_ahead(target_vehicle.get_transform(), + self._vehicle.get_transform(), + self._proximity_vehicle_threshold): + return (True, target_vehicle) + + return (False, None) + + @staticmethod + def emergency_stop(): + """ + Send an emergency stop command to the vehicle + + :return: control for braking + """ + control = carla.VehicleControl() + control.steer = 0.0 + control.throttle = 0.0 + control.brake = 1.0 + control.hand_brake = False + + return control diff --git a/python/agents/navigation/basic_agent.py b/python/agents/navigation/basic_agent.py new file mode 100644 index 0000000..2188b4c --- /dev/null +++ b/python/agents/navigation/basic_agent.py @@ -0,0 +1,128 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module implements an agent that roams around a track following random +waypoints and avoiding other vehicles. +The agent also responds to traffic lights. """ + +import carla +from agents.navigation.agent import Agent, AgentState +from agents.navigation.local_planner import LocalPlanner +from agents.navigation.global_route_planner import GlobalRoutePlanner +from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO + + +class BasicAgent(Agent): + """ + BasicAgent implements a basic agent that navigates scenes to reach a given + target destination. This agent respects traffic lights and other vehicles. + """ + + def __init__(self, vehicle, target_speed=20): + """ + + :param vehicle: actor to apply to local planner logic onto + """ + super(BasicAgent, self).__init__(vehicle) + + self._proximity_tlight_threshold = 5.0 # meters + self._proximity_vehicle_threshold = 10.0 # meters + self._state = AgentState.NAVIGATING + args_lateral_dict = { + 'K_P': 1, + 'K_D': 0.4, + 'K_I': 0, + 'dt': 1.0 / 20.0} + self._local_planner = LocalPlanner( + self._vehicle, opt_dict={'target_speed': target_speed, + 'lateral_control_dict': args_lateral_dict}) + self._hop_resolution = 2.0 + self._path_seperation_hop = 2 + self._path_seperation_threshold = 0.5 + self._target_speed = target_speed + self._grp = None + + def set_destination(self, location): + """ + This method creates a list of waypoints from agent's position to destination location + based on the route returned by the global router + """ + + start_waypoint = self._map.get_waypoint(self._vehicle.get_location()) + end_waypoint = self._map.get_waypoint( + carla.Location(location[0], location[1], location[2])) + + route_trace = self._trace_route(start_waypoint, end_waypoint) + + self._local_planner.set_global_plan(route_trace) + + def _trace_route(self, start_waypoint, end_waypoint): + """ + This method sets up a global router and returns the optimal route + from start_waypoint to end_waypoint + """ + + # Setting up global router + if self._grp is None: + dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._hop_resolution) + grp = GlobalRoutePlanner(dao) + grp.setup() + self._grp = grp + + # Obtain route plan + route = self._grp.trace_route( + start_waypoint.transform.location, + end_waypoint.transform.location) + + return route + + def run_step(self, debug=False): + """ + Execute one step of navigation. + :return: carla.VehicleControl + """ + + # is there an obstacle in front of us? + hazard_detected = False + + # retrieve relevant elements for safe navigation, i.e.: traffic lights + # and other vehicles + actor_list = self._world.get_actors() + vehicle_list = actor_list.filter("*vehicle*") + lights_list = actor_list.filter("*traffic_light*") + + # check possible obstacles + vehicle_state, vehicle = self._is_vehicle_hazard(vehicle_list) + if vehicle_state: + if debug: + print('!!! VEHICLE BLOCKING AHEAD [{}])'.format(vehicle.id)) + + self._state = AgentState.BLOCKED_BY_VEHICLE + hazard_detected = True + + # check for the state of the traffic lights + light_state, traffic_light = self._is_light_red(lights_list) + if light_state: + if debug: + print('=== RED LIGHT AHEAD [{}])'.format(traffic_light.id)) + + self._state = AgentState.BLOCKED_RED_LIGHT + hazard_detected = True + + if hazard_detected: + control = self.emergency_stop() + else: + self._state = AgentState.NAVIGATING + # standard local planner behavior + control = self._local_planner.run_step(debug=debug) + + return control + + def done(self): + """ + Check whether the agent has reached its destination. + :return bool + """ + return self._local_planner.done() diff --git a/python/agents/navigation/behavior_agent.py b/python/agents/navigation/behavior_agent.py new file mode 100644 index 0000000..3d27f41 --- /dev/null +++ b/python/agents/navigation/behavior_agent.py @@ -0,0 +1,438 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + + +""" This module implements an agent that roams around a track following random +waypoints and avoiding other vehicles. The agent also responds to traffic lights, +traffic signs, and has different possible configurations. """ + +import random +import numpy as np +import carla +from agents.navigation.agent import Agent +from agents.navigation.local_planner_behavior import LocalPlanner, RoadOption +from agents.navigation.global_route_planner import GlobalRoutePlanner +from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO +from agents.navigation.types_behavior import Cautious, Aggressive, Normal + +from agents.tools.misc import get_speed, positive + + +class BehaviorAgent(Agent): + """ + BehaviorAgent implements an agent that navigates scenes to reach a given + target destination, by computing the shortest possible path to it. + This agent can correctly follow traffic signs, speed limitations, + traffic lights, while also taking into account nearby vehicles. Lane changing + decisions can be taken by analyzing the surrounding environment, + such as overtaking or tailgating avoidance. Adding to these are possible + behaviors, the agent can also keep safety distance from a car in front of it + by tracking the instantaneous time to collision and keeping it in a certain range. + Finally, different sets of behaviors are encoded in the agent, from cautious + to a more aggressive ones. + """ + + def __init__(self, vehicle, ignore_traffic_light=True, behavior='normal'): + """ + Constructor method. + + :param vehicle: actor to apply to local planner logic onto + :param ignore_traffic_light: boolean to ignore any traffic light + :param behavior: type of agent to apply + """ + + super(BehaviorAgent, self).__init__(vehicle) + self.vehicle = vehicle + self.ignore_traffic_light = ignore_traffic_light + self._local_planner = LocalPlanner(self) + self._grp = None + self.look_ahead_steps = 0 + + # Vehicle information + self.speed = 0 + self.speed_limit = 0 + self.direction = None + self.incoming_direction = None + self.incoming_waypoint = None + self.start_waypoint = None + self.end_waypoint = None + self.is_at_traffic_light = 0 + self.light_state = "Green" + self.light_id_to_ignore = -1 + self.min_speed = 5 + self.behavior = None + self._sampling_resolution = 4.5 + + # Parameters for agent behavior + if behavior == 'cautious': + self.behavior = Cautious() + + elif behavior == 'normal': + self.behavior = Normal() + + elif behavior == 'aggressive': + self.behavior = Aggressive() + + def update_information(self, vehicle): + """ + This method updates the information regarding the ego + vehicle based on the surrounding world. + + :param world: carla.world object + """ + self.speed = get_speed(self.vehicle) + self.speed_limit = vehicle.get_speed_limit() + self._local_planner.set_speed(self.speed_limit) + self.direction = self._local_planner.target_road_option + if self.direction is None: + self.direction = RoadOption.LANEFOLLOW + + self.look_ahead_steps = int((self.speed_limit) / 10) + + self.incoming_waypoint, self.incoming_direction = self._local_planner.get_incoming_waypoint_and_direction( + steps=self.look_ahead_steps) + if self.incoming_direction is None: + self.incoming_direction = RoadOption.LANEFOLLOW + + self.is_at_traffic_light = vehicle.is_at_traffic_light() + if self.ignore_traffic_light: + self.light_state = "Green" + else: + # This method also includes stop signs and intersections. + self.light_state = str(self.vehicle.get_traffic_light_state()) + + def set_destination(self, start_location, end_location, clean=False): + """ + This method creates a list of waypoints from agent's position to destination location + based on the route returned by the global router. + + :param start_location: initial position + :param end_location: final position + :param clean: boolean to clean the waypoint queue + """ + if clean: + self._local_planner.waypoints_queue.clear() + self.start_waypoint = self._map.get_waypoint(start_location) + self.end_waypoint = self._map.get_waypoint(end_location) + + route_trace = self._trace_route(self.start_waypoint, self.end_waypoint) + + self._local_planner.set_global_plan(route_trace, clean) + + def reroute(self, spawn_points): + """ + This method implements re-routing for vehicles approaching its destination. + It finds a new target and computes another path to reach it. + + :param spawn_points: list of possible destinations for the agent + """ + + print("Target almost reached, setting new destination...") + random.shuffle(spawn_points) + new_start = self._local_planner.waypoints_queue[-1][0].transform.location + destination = spawn_points[0].location if spawn_points[0].location != new_start else spawn_points[1].location + print("New destination: " + str(destination)) + + self.set_destination(new_start, destination) + + def _trace_route(self, start_waypoint, end_waypoint): + """ + This method sets up a global router and returns the + optimal route from start_waypoint to end_waypoint. + + :param start_waypoint: initial position + :param end_waypoint: final position + """ + # Setting up global router + if self._grp is None: + wld = self.vehicle.get_world() + dao = GlobalRoutePlannerDAO( + wld.get_map(), sampling_resolution=self._sampling_resolution) + grp = GlobalRoutePlanner(dao) + grp.setup() + self._grp = grp + + # Obtain route plan + route = self._grp.trace_route( + start_waypoint.transform.location, + end_waypoint.transform.location) + + return route + + def traffic_light_manager(self, waypoint): + """ + This method is in charge of behaviors for red lights and stops. + + WARNING: What follows is a proxy to avoid having a car brake after running a yellow light. + This happens because the car is still under the influence of the semaphore, + even after passing it. So, the semaphore id is temporarely saved to + ignore it and go around this issue, until the car is near a new one. + + :param waypoint: current waypoint of the agent + """ + + light_id = self.vehicle.get_traffic_light().id if self.vehicle.get_traffic_light() is not None else -1 + + if self.light_state == "Red": + if not waypoint.is_junction and (self.light_id_to_ignore != light_id or light_id == -1): + return 1 + elif waypoint.is_junction and light_id != -1: + self.light_id_to_ignore = light_id + if self.light_id_to_ignore != light_id: + self.light_id_to_ignore = -1 + return 0 + + def _overtake(self, location, waypoint, vehicle_list): + """ + This method is in charge of overtaking behaviors. + + :param location: current location of the agent + :param waypoint: current waypoint of the agent + :param vehicle_list: list of all the nearby vehicles + """ + + left_turn = waypoint.left_lane_marking.lane_change + right_turn = waypoint.right_lane_marking.lane_change + + left_wpt = waypoint.get_left_lane() + right_wpt = waypoint.get_right_lane() + + if (left_turn == carla.LaneChange.Left or left_turn == + carla.LaneChange.Both) and waypoint.lane_id * left_wpt.lane_id > 0 and left_wpt.lane_type == carla.LaneType.Driving: + new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=180, lane_offset=-1) + if not new_vehicle_state: + print("Overtaking to the left!") + self.behavior.overtake_counter = 200 + self.set_destination(left_wpt.transform.location, + self.end_waypoint.transform.location, clean=True) + elif right_turn == carla.LaneChange.Right and waypoint.lane_id * right_wpt.lane_id > 0 and right_wpt.lane_type == carla.LaneType.Driving: + new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=180, lane_offset=1) + if not new_vehicle_state: + print("Overtaking to the right!") + self.behavior.overtake_counter = 200 + self.set_destination(right_wpt.transform.location, + self.end_waypoint.transform.location, clean=True) + + def _tailgating(self, location, waypoint, vehicle_list): + """ + This method is in charge of tailgating behaviors. + + :param location: current location of the agent + :param waypoint: current waypoint of the agent + :param vehicle_list: list of all the nearby vehicles + """ + + left_turn = waypoint.left_lane_marking.lane_change + right_turn = waypoint.right_lane_marking.lane_change + + left_wpt = waypoint.get_left_lane() + right_wpt = waypoint.get_right_lane() + + behind_vehicle_state, behind_vehicle, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, low_angle_th=160) + if behind_vehicle_state and self.speed < get_speed(behind_vehicle): + if (right_turn == carla.LaneChange.Right or right_turn == + carla.LaneChange.Both) and waypoint.lane_id * right_wpt.lane_id > 0 and right_wpt.lane_type == carla.LaneType.Driving: + new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=1) + if not new_vehicle_state: + print("Tailgating, moving to the right!") + self.behavior.tailgate_counter = 200 + self.set_destination(right_wpt.transform.location, + self.end_waypoint.transform.location, clean=True) + elif left_turn == carla.LaneChange.Left and waypoint.lane_id * left_wpt.lane_id > 0 and left_wpt.lane_type == carla.LaneType.Driving: + new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=-1) + if not new_vehicle_state: + print("Tailgating, moving to the left!") + self.behavior.tailgate_counter = 200 + self.set_destination(left_wpt.transform.location, + self.end_waypoint.transform.location, clean=True) + + def collision_and_car_avoid_manager(self, location, waypoint): + """ + This module is in charge of warning in case of a collision + and managing possible overtaking or tailgating chances. + + :param location: current location of the agent + :param waypoint: current waypoint of the agent + :return vehicle_state: True if there is a vehicle nearby, False if not + :return vehicle: nearby vehicle + :return distance: distance to nearby vehicle + """ + + vehicle_list = self._world.get_actors().filter("*vehicle*") + + def dist(v): + return v.get_location().distance(waypoint.transform.location) + + vehicle_list = [v for v in vehicle_list if dist(v) < 45 and v.id != self.vehicle.id] + + if self.direction == RoadOption.CHANGELANELEFT: + vehicle_state, vehicle, distance = self._bh_is_vehicle_hazard( + waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=-1) + elif self.direction == RoadOption.CHANGELANERIGHT: + vehicle_state, vehicle, distance = self._bh_is_vehicle_hazard( + waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=1) + else: + vehicle_state, vehicle, distance = self._bh_is_vehicle_hazard( + waypoint, location, vehicle_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=30) + + # Check for overtaking + + if vehicle_state and self.direction == RoadOption.LANEFOLLOW and \ + not waypoint.is_junction and self.speed > 10 \ + and self.behavior.overtake_counter == 0 and self.speed > get_speed(vehicle): + self._overtake(location, waypoint, vehicle_list) + + # Check for tailgating + + elif not vehicle_state and self.direction == RoadOption.LANEFOLLOW \ + and not waypoint.is_junction and self.speed > 10 \ + and self.behavior.tailgate_counter == 0: + self._tailgating(location, waypoint, vehicle_list) + + return vehicle_state, vehicle, distance + + def pedestrian_avoid_manager(self, location, waypoint): + """ + This module is in charge of warning in case of a collision + with any pedestrian. + + :param location: current location of the agent + :param waypoint: current waypoint of the agent + :return vehicle_state: True if there is a walker nearby, False if not + :return vehicle: nearby walker + :return distance: distance to nearby walker + """ + + walker_list = self._world.get_actors().filter("*walker.pedestrian*") + + def dist(w): + return w.get_location().distance(waypoint.transform.location) + + walker_list = [w for w in walker_list if dist(w) < 10] + + if self.direction == RoadOption.CHANGELANELEFT: + walker_state, walker, distance = self._bh_is_vehicle_hazard(waypoint, location, walker_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=90, lane_offset=-1) + elif self.direction == RoadOption.CHANGELANERIGHT: + walker_state, walker, distance = self._bh_is_vehicle_hazard(waypoint, location, walker_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=90, lane_offset=1) + else: + walker_state, walker, distance = self._bh_is_vehicle_hazard(waypoint, location, walker_list, max( + self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=60) + + return walker_state, walker, distance + + def car_following_manager(self, vehicle, distance, debug=False): + """ + Module in charge of car-following behaviors when there's + someone in front of us. + + :param vehicle: car to follow + :param distance: distance from vehicle + :param debug: boolean for debugging + :return control: carla.VehicleControl + """ + + vehicle_speed = get_speed(vehicle) + delta_v = max(1, (self.speed - vehicle_speed) / 3.6) + ttc = distance / delta_v if delta_v != 0 else distance / np.nextafter(0., 1.) + + # Under safety time distance, slow down. + if self.behavior.safety_time > ttc > 0.0: + control = self._local_planner.run_step( + target_speed=min(positive(vehicle_speed - self.behavior.speed_decrease), + min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist)), + debug=debug) + # Actual safety distance area, try to follow the speed of the vehicle in front. + elif 2 * self.behavior.safety_time > ttc >= self.behavior.safety_time: + control = self._local_planner.run_step( + target_speed=min(max(self.min_speed, vehicle_speed), + min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist)), + debug=debug) + # Normal behavior. + else: + control = self._local_planner.run_step( + target_speed=min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist), debug=debug) + + return control + + def run_step(self, debug=False): + """ + Execute one step of navigation. + + :param debug: boolean for debugging + :return control: carla.VehicleControl + """ + control = None + if self.behavior.tailgate_counter > 0: + self.behavior.tailgate_counter -= 1 + if self.behavior.overtake_counter > 0: + self.behavior.overtake_counter -= 1 + + ego_vehicle_loc = self.vehicle.get_location() + ego_vehicle_wp = self._map.get_waypoint(ego_vehicle_loc) + + # 1: Red lights and stops behavior + + if self.traffic_light_manager(ego_vehicle_wp) != 0: + return self.emergency_stop() + + # 2.1: Pedestrian avoidancd behaviors + + walker_state, walker, w_distance = self.pedestrian_avoid_manager( + ego_vehicle_loc, ego_vehicle_wp) + + if walker_state: + # Distance is computed from the center of the two cars, + # we use bounding boxes to calculate the actual distance + distance = w_distance - max( + walker.bounding_box.extent.y, walker.bounding_box.extent.x) - max( + self.vehicle.bounding_box.extent.y, self.vehicle.bounding_box.extent.x) + + # Emergency brake if the car is very close. + if distance < self.behavior.braking_distance: + return self.emergency_stop() + + # 2.2: Car following behaviors + vehicle_state, vehicle, distance = self.collision_and_car_avoid_manager( + ego_vehicle_loc, ego_vehicle_wp) + + if vehicle_state: + # Distance is computed from the center of the two cars, + # we use bounding boxes to calculate the actual distance + distance = distance - max( + vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x) - max( + self.vehicle.bounding_box.extent.y, self.vehicle.bounding_box.extent.x) + + # Emergency brake if the car is very close. + if distance < self.behavior.braking_distance: + return self.emergency_stop() + else: + control = self.car_following_manager(vehicle, distance) + + # 4: Intersection behavior + + # Checking if there's a junction nearby to slow down + elif self.incoming_waypoint.is_junction and ( + self.incoming_direction == RoadOption.LEFT or self.incoming_direction == RoadOption.RIGHT): + control = self._local_planner.run_step( + target_speed=min(self.behavior.max_speed, self.speed_limit - 5), debug=debug) + + # 5: Normal behavior + + # Calculate controller based on no turn, traffic light or vehicle in front + else: + control = self._local_planner.run_step( + target_speed=min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist), debug=debug) + + return control diff --git a/python/agents/navigation/controller.py b/python/agents/navigation/controller.py new file mode 100644 index 0000000..4ec0a92 --- /dev/null +++ b/python/agents/navigation/controller.py @@ -0,0 +1,215 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module contains PID controllers to perform lateral and longitudinal control. """ + +from collections import deque +import math +import numpy as np +import carla +from agents.tools.misc import get_speed + + +class VehiclePIDController: + """ + VehiclePIDController is the combination of two PID controllers + (lateral and longitudinal) to perform the + low level control a vehicle from client side + """ + + def __init__(self, vehicle, args_lateral, args_longitudinal, max_throttle=0.75, max_brake=0.3, max_steering=0.8): + """ + Constructor method. + + :param vehicle: actor to apply to local planner logic onto + :param args_lateral: dictionary of arguments to set the lateral PID controller + using the following semantics: + K_P -- Proportional term + K_D -- Differential term + K_I -- Integral term + :param args_longitudinal: dictionary of arguments to set the longitudinal + PID controller using the following semantics: + K_P -- Proportional term + K_D -- Differential term + K_I -- Integral term + """ + + self.max_brake = max_brake + self.max_throt = max_throttle + self.max_steer = max_steering + + self._vehicle = vehicle + self._world = self._vehicle.get_world() + self.past_steering = self._vehicle.get_control().steer + self._lon_controller = PIDLongitudinalController(self._vehicle, **args_longitudinal) + self._lat_controller = PIDLateralController(self._vehicle, **args_lateral) + + def run_step(self, target_speed, waypoint): + """ + Execute one step of control invoking both lateral and longitudinal + PID controllers to reach a target waypoint + at a given target_speed. + + :param target_speed: desired vehicle speed + :param waypoint: target location encoded as a waypoint + :return: distance (in meters) to the waypoint + """ + + acceleration = self._lon_controller.run_step(target_speed) + current_steering = self._lat_controller.run_step(waypoint) + control = carla.VehicleControl() + if acceleration >= 0.0: + control.throttle = min(acceleration, self.max_throt) + control.brake = 0.0 + else: + control.throttle = 0.0 + control.brake = min(abs(acceleration), self.max_brake) + + # Steering regulation: changes cannot happen abruptly, can't steer too much. + + if current_steering > self.past_steering + 0.1: + current_steering = self.past_steering + 0.1 + elif current_steering < self.past_steering - 0.1: + current_steering = self.past_steering - 0.1 + + if current_steering >= 0: + steering = min(self.max_steer, current_steering) + else: + steering = max(-self.max_steer, current_steering) + + control.steer = steering + control.hand_brake = False + control.manual_gear_shift = False + self.past_steering = steering + + return control + + +class PIDLongitudinalController: + """ + PIDLongitudinalController implements longitudinal control using a PID. + """ + + def __init__(self, vehicle, K_P=1.0, K_D=0.0, K_I=0.0, dt=0.03): + """ + Constructor method. + + :param vehicle: actor to apply to local planner logic onto + :param K_P: Proportional term + :param K_D: Differential term + :param K_I: Integral term + :param dt: time differential in seconds + """ + self._vehicle = vehicle + self._k_p = K_P + self._k_d = K_D + self._k_i = K_I + self._dt = dt + self._error_buffer = deque(maxlen=10) + + def run_step(self, target_speed, debug=False): + """ + Execute one step of longitudinal control to reach a given target speed. + + :param target_speed: target speed in Km/h + :param debug: boolean for debugging + :return: throttle control + """ + current_speed = get_speed(self._vehicle) + + if debug: + print('Current speed = {}'.format(current_speed)) + + return self._pid_control(target_speed, current_speed) + + def _pid_control(self, target_speed, current_speed): + """ + Estimate the throttle/brake of the vehicle based on the PID equations + + :param target_speed: target speed in Km/h + :param current_speed: current speed of the vehicle in Km/h + :return: throttle/brake control + """ + + error = target_speed - current_speed + self._error_buffer.append(error) + + if len(self._error_buffer) >= 2: + _de = (self._error_buffer[-1] - self._error_buffer[-2]) / self._dt + _ie = sum(self._error_buffer) * self._dt + else: + _de = 0.0 + _ie = 0.0 + + return np.clip((self._k_p * error) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0) + + +class PIDLateralController: + """ + PIDLateralController implements lateral control using a PID. + """ + + def __init__(self, vehicle, K_P=1.0, K_D=0.0, K_I=0.0, dt=0.03): + """ + Constructor method. + + :param vehicle: actor to apply to local planner logic onto + :param K_P: Proportional term + :param K_D: Differential term + :param K_I: Integral term + :param dt: time differential in seconds + """ + self._vehicle = vehicle + self._k_p = K_P + self._k_d = K_D + self._k_i = K_I + self._dt = dt + self._e_buffer = deque(maxlen=10) + + def run_step(self, waypoint): + """ + Execute one step of lateral control to steer + the vehicle towards a certain waypoin. + + :param waypoint: target waypoint + :return: steering control in the range [-1, 1] where: + -1 maximum steering to left + +1 maximum steering to right + """ + return self._pid_control(waypoint, self._vehicle.get_transform()) + + def _pid_control(self, waypoint, vehicle_transform): + """ + Estimate the steering angle of the vehicle based on the PID equations + + :param waypoint: target waypoint + :param vehicle_transform: current transform of the vehicle + :return: steering control in the range [-1, 1] + """ + v_begin = vehicle_transform.location + v_end = v_begin + carla.Location(x=math.cos(math.radians(vehicle_transform.rotation.yaw)), + y=math.sin(math.radians(vehicle_transform.rotation.yaw))) + + v_vec = np.array([v_end.x - v_begin.x, v_end.y - v_begin.y, 0.0]) + w_vec = np.array([waypoint.transform.location.x - + v_begin.x, waypoint.transform.location.y - + v_begin.y, 0.0]) + _dot = math.acos(np.clip(np.dot(w_vec, v_vec) / + (np.linalg.norm(w_vec) * np.linalg.norm(v_vec)), -1.0, 1.0)) + + _cross = np.cross(v_vec, w_vec) + + if _cross[2] < 0: + _dot *= -1.0 + + self._e_buffer.append(_dot) + if len(self._e_buffer) >= 2: + _de = (self._e_buffer[-1] - self._e_buffer[-2]) / self._dt + _ie = sum(self._e_buffer) * self._dt + else: + _de = 0.0 + _ie = 0.0 + + return np.clip((self._k_p * _dot) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0) diff --git a/python/agents/navigation/global_route_planner.py b/python/agents/navigation/global_route_planner.py new file mode 100644 index 0000000..1f4696d --- /dev/null +++ b/python/agents/navigation/global_route_planner.py @@ -0,0 +1,387 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + + +""" +This module provides GlobalRoutePlanner implementation. +""" + +import math +import numpy as np +import networkx as nx + +import carla +from agents.navigation.local_planner import RoadOption +from agents.tools.misc import vector + + +class GlobalRoutePlanner(object): + """ + This class provides a very high level route plan. + Instantiate the class by passing a reference to + A GlobalRoutePlannerDAO object. + """ + + def __init__(self, dao): + """ + Constructor + """ + self._dao = dao + self._topology = None + self._graph = None + self._id_map = None + self._road_id_to_edge = None + self._intersection_end_node = -1 + self._previous_decision = RoadOption.VOID + + def setup(self): + """ + Performs initial server data lookup for detailed topology + and builds graph representation of the world map. + """ + self._topology = self._dao.get_topology() + self._graph, self._id_map, self._road_id_to_edge = self._build_graph() + self._find_loose_ends() + self._lane_change_link() + + def _build_graph(self): + """ + This function builds a networkx graph representation of topology. + The topology is read from self._topology. + graph node properties: + vertex - (x,y,z) position in world map + graph edge properties: + entry_vector - unit vector along tangent at entry point + exit_vector - unit vector along tangent at exit point + net_vector - unit vector of the chord from entry to exit + intersection - boolean indicating if the edge belongs to an + intersection + return : graph -> networkx graph representing the world map, + id_map-> mapping from (x,y,z) to node id + road_id_to_edge-> map from road id to edge in the graph + """ + graph = nx.DiGraph() + id_map = dict() # Map with structure {(x,y,z): id, ... } + road_id_to_edge = dict() # Map with structure {road_id: {lane_id: edge, ... }, ... } + + for segment in self._topology: + + entry_xyz, exit_xyz = segment['entryxyz'], segment['exitxyz'] + path = segment['path'] + entry_wp, exit_wp = segment['entry'], segment['exit'] + intersection = entry_wp.is_junction + road_id, section_id, lane_id = entry_wp.road_id, entry_wp.section_id, entry_wp.lane_id + + for vertex in entry_xyz, exit_xyz: + # Adding unique nodes and populating id_map + if vertex not in id_map: + new_id = len(id_map) + id_map[vertex] = new_id + graph.add_node(new_id, vertex=vertex) + n1 = id_map[entry_xyz] + n2 = id_map[exit_xyz] + if road_id not in road_id_to_edge: + road_id_to_edge[road_id] = dict() + if section_id not in road_id_to_edge[road_id]: + road_id_to_edge[road_id][section_id] = dict() + road_id_to_edge[road_id][section_id][lane_id] = (n1, n2) + + entry_carla_vector = entry_wp.transform.rotation.get_forward_vector() + exit_carla_vector = exit_wp.transform.rotation.get_forward_vector() + + # Adding edge with attributes + graph.add_edge( + n1, n2, + length=len(path) + 1, path=path, + entry_waypoint=entry_wp, exit_waypoint=exit_wp, + entry_vector=np.array( + [entry_carla_vector.x, entry_carla_vector.y, entry_carla_vector.z]), + exit_vector=np.array( + [exit_carla_vector.x, exit_carla_vector.y, exit_carla_vector.z]), + net_vector=vector(entry_wp.transform.location, exit_wp.transform.location), + intersection=intersection, type=RoadOption.LANEFOLLOW) + + return graph, id_map, road_id_to_edge + + def _find_loose_ends(self): + """ + This method finds road segments that have an unconnected end, and + adds them to the internal graph representation + """ + count_loose_ends = 0 + hop_resolution = self._dao.get_resolution() + for segment in self._topology: + end_wp = segment['exit'] + exit_xyz = segment['exitxyz'] + road_id, section_id, lane_id = end_wp.road_id, end_wp.section_id, end_wp.lane_id + if road_id in self._road_id_to_edge and section_id in self._road_id_to_edge[road_id] and lane_id in self._road_id_to_edge[road_id][section_id]: + pass + else: + count_loose_ends += 1 + if road_id not in self._road_id_to_edge: + self._road_id_to_edge[road_id] = dict() + if section_id not in self._road_id_to_edge[road_id]: + self._road_id_to_edge[road_id][section_id] = dict() + n1 = self._id_map[exit_xyz] + n2 = -1*count_loose_ends + self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2) + next_wp = end_wp.next(hop_resolution) + path = [] + while next_wp is not None and next_wp and next_wp[0].road_id == road_id and next_wp[0].section_id == section_id and next_wp[0].lane_id == lane_id: + path.append(next_wp[0]) + next_wp = next_wp[0].next(hop_resolution) + if path: + n2_xyz = (path[-1].transform.location.x, + path[-1].transform.location.y, + path[-1].transform.location.z) + self._graph.add_node(n2, vertex=n2_xyz) + self._graph.add_edge( + n1, n2, + length=len(path) + 1, path=path, + entry_waypoint=end_wp, exit_waypoint=path[-1], + entry_vector=None, exit_vector=None, net_vector=None, + intersection=end_wp.is_junction, type=RoadOption.LANEFOLLOW) + + def _localize(self, location): + """ + This function finds the road segment closest to given location + location : carla.Location to be localized in the graph + return : pair node ids representing an edge in the graph + """ + waypoint = self._dao.get_waypoint(location) + edge = None + try: + edge = self._road_id_to_edge[waypoint.road_id][waypoint.section_id][waypoint.lane_id] + except KeyError: + print( + "Failed to localize! : ", + "Road id : ", waypoint.road_id, + "Section id : ", waypoint.section_id, + "Lane id : ", waypoint.lane_id, + "Location : ", waypoint.transform.location.x, + waypoint.transform.location.y) + return edge + + def _lane_change_link(self): + """ + This method places zero cost links in the topology graph + representing availability of lane changes. + """ + + for segment in self._topology: + left_found, right_found = False, False + + for waypoint in segment['path']: + if not segment['entry'].is_junction: + next_waypoint, next_road_option, next_segment = None, None, None + + if waypoint.right_lane_marking.lane_change & carla.LaneChange.Right and not right_found: + next_waypoint = waypoint.get_right_lane() + if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and waypoint.road_id == next_waypoint.road_id: + next_road_option = RoadOption.CHANGELANERIGHT + next_segment = self._localize(next_waypoint.transform.location) + if next_segment is not None: + self._graph.add_edge( + self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint, + exit_waypoint=next_waypoint, intersection=False, exit_vector=None, + path=[], length=0, type=next_road_option, change_waypoint=next_waypoint) + right_found = True + if waypoint.left_lane_marking.lane_change & carla.LaneChange.Left and not left_found: + next_waypoint = waypoint.get_left_lane() + if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and waypoint.road_id == next_waypoint.road_id: + next_road_option = RoadOption.CHANGELANELEFT + next_segment = self._localize(next_waypoint.transform.location) + if next_segment is not None: + self._graph.add_edge( + self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint, + exit_waypoint=next_waypoint, intersection=False, exit_vector=None, + path=[], length=0, type=next_road_option, change_waypoint=next_waypoint) + left_found = True + if left_found and right_found: + break + + def _distance_heuristic(self, n1, n2): + """ + Distance heuristic calculator for path searching + in self._graph + """ + l1 = np.array(self._graph.nodes[n1]['vertex']) + l2 = np.array(self._graph.nodes[n2]['vertex']) + return np.linalg.norm(l1-l2) + + def _path_search(self, origin, destination): + """ + This function finds the shortest path connecting origin and destination + using A* search with distance heuristic. + origin : carla.Location object of start position + destination : carla.Location object of of end position + return : path as list of node ids (as int) of the graph self._graph + connecting origin and destination + """ + + start, end = self._localize(origin), self._localize(destination) + + route = nx.astar_path( + self._graph, source=start[0], target=end[0], + heuristic=self._distance_heuristic, weight='length') + route.append(end[1]) + return route + + def _successive_last_intersection_edge(self, index, route): + """ + This method returns the last successive intersection edge + from a starting index on the route. + This helps moving past tiny intersection edges to calculate + proper turn decisions. + """ + + last_intersection_edge = None + last_node = None + for node1, node2 in [(route[i], route[i+1]) for i in range(index, len(route)-1)]: + candidate_edge = self._graph.edges[node1, node2] + if node1 == route[index]: + last_intersection_edge = candidate_edge + if candidate_edge['type'] == RoadOption.LANEFOLLOW and candidate_edge['intersection']: + last_intersection_edge = candidate_edge + last_node = node2 + else: + break + + return last_node, last_intersection_edge + + def _turn_decision(self, index, route, threshold=math.radians(35)): + """ + This method returns the turn decision (RoadOption) for pair of edges + around current index of route list + """ + + decision = None + previous_node = route[index-1] + current_node = route[index] + next_node = route[index+1] + next_edge = self._graph.edges[current_node, next_node] + if index > 0: + if self._previous_decision != RoadOption.VOID and self._intersection_end_node > 0 and self._intersection_end_node != previous_node and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection']: + decision = self._previous_decision + else: + self._intersection_end_node = -1 + current_edge = self._graph.edges[previous_node, current_node] + calculate_turn = current_edge['type'] == RoadOption.LANEFOLLOW and not current_edge[ + 'intersection'] and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection'] + if calculate_turn: + last_node, tail_edge = self._successive_last_intersection_edge(index, route) + self._intersection_end_node = last_node + if tail_edge is not None: + next_edge = tail_edge + cv, nv = current_edge['exit_vector'], next_edge['exit_vector'] + if cv is None or nv is None: + return next_edge['type'] + cross_list = [] + for neighbor in self._graph.successors(current_node): + select_edge = self._graph.edges[current_node, neighbor] + if select_edge['type'] == RoadOption.LANEFOLLOW: + if neighbor != route[index+1]: + sv = select_edge['net_vector'] + cross_list.append(np.cross(cv, sv)[2]) + next_cross = np.cross(cv, nv)[2] + deviation = math.acos(np.clip( + np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0)) + if not cross_list: + cross_list.append(0) + if deviation < threshold: + decision = RoadOption.STRAIGHT + elif cross_list and next_cross < min(cross_list): + decision = RoadOption.LEFT + elif cross_list and next_cross > max(cross_list): + decision = RoadOption.RIGHT + elif next_cross < 0: + decision = RoadOption.LEFT + elif next_cross > 0: + decision = RoadOption.RIGHT + else: + decision = next_edge['type'] + + else: + decision = next_edge['type'] + + self._previous_decision = decision + return decision + + def abstract_route_plan(self, origin, destination): + """ + The following function generates the route plan based on + origin : carla.Location object of the route's start position + destination : carla.Location object of the route's end position + return : list of turn by turn navigation decisions as + agents.navigation.local_planner.RoadOption elements + Possible values are STRAIGHT, LEFT, RIGHT, LANEFOLLOW, VOID + CHANGELANELEFT, CHANGELANERIGHT + """ + + route = self._path_search(origin, destination) + plan = [] + + for i in range(len(route) - 1): + road_option = self._turn_decision(i, route) + plan.append(road_option) + + return plan + + def _find_closest_in_list(self, current_waypoint, waypoint_list): + min_distance = float('inf') + closest_index = -1 + for i, waypoint in enumerate(waypoint_list): + distance = waypoint.transform.location.distance( + current_waypoint.transform.location) + if distance < min_distance: + min_distance = distance + closest_index = i + + return closest_index + + def trace_route(self, origin, destination): + """ + This method returns list of (carla.Waypoint, RoadOption) + from origin to destination + """ + + route_trace = [] + route = self._path_search(origin, destination) + current_waypoint = self._dao.get_waypoint(origin) + destination_waypoint = self._dao.get_waypoint(destination) + resolution = self._dao.get_resolution() + + for i in range(len(route) - 1): + road_option = self._turn_decision(i, route) + edge = self._graph.edges[route[i], route[i+1]] + path = [] + + if edge['type'] != RoadOption.LANEFOLLOW and edge['type'] != RoadOption.VOID: + route_trace.append((current_waypoint, road_option)) + exit_wp = edge['exit_waypoint'] + n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id] + next_edge = self._graph.edges[n1, n2] + if next_edge['path']: + closest_index = self._find_closest_in_list(current_waypoint, next_edge['path']) + closest_index = min(len(next_edge['path'])-1, closest_index+5) + current_waypoint = next_edge['path'][closest_index] + else: + current_waypoint = next_edge['exit_waypoint'] + route_trace.append((current_waypoint, road_option)) + + else: + path = path + [edge['entry_waypoint']] + edge['path'] + [edge['exit_waypoint']] + closest_index = self._find_closest_in_list(current_waypoint, path) + for waypoint in path[closest_index:]: + current_waypoint = waypoint + route_trace.append((current_waypoint, road_option)) + if len(route)-i <= 2 and waypoint.transform.location.distance(destination) < 2*resolution: + break + elif len(route)-i <= 2 and current_waypoint.road_id == destination_waypoint.road_id and current_waypoint.section_id == destination_waypoint.section_id and current_waypoint.lane_id == destination_waypoint.lane_id: + destination_index = self._find_closest_in_list(destination_waypoint, path) + if closest_index > destination_index: + break + + return route_trace diff --git a/python/agents/navigation/global_route_planner_dao.py b/python/agents/navigation/global_route_planner_dao.py new file mode 100644 index 0000000..b5dc9b1 --- /dev/null +++ b/python/agents/navigation/global_route_planner_dao.py @@ -0,0 +1,79 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" +This module provides implementation for GlobalRoutePlannerDAO +""" + +import numpy as np + + +class GlobalRoutePlannerDAO(object): + """ + This class is the data access layer for fetching data + from the carla server instance for GlobalRoutePlanner + """ + + def __init__(self, wmap, sampling_resolution): + """ + Constructor method. + + :param wmap: carla.world object + :param sampling_resolution: sampling distance between waypoints + """ + self._sampling_resolution = sampling_resolution + self._wmap = wmap + + def get_topology(self): + """ + Accessor for topology. + This function retrieves topology from the server as a list of + road segments as pairs of waypoint objects, and processes the + topology into a list of dictionary objects. + + :return topology: list of dictionary objects with the following attributes + entry - waypoint of entry point of road segment + entryxyz- (x,y,z) of entry point of road segment + exit - waypoint of exit point of road segment + exitxyz - (x,y,z) of exit point of road segment + path - list of waypoints separated by 1m from entry + to exit + """ + topology = [] + # Retrieving waypoints to construct a detailed topology + for segment in self._wmap.get_topology(): + wp1, wp2 = segment[0], segment[1] + l1, l2 = wp1.transform.location, wp2.transform.location + # Rounding off to avoid floating point imprecision + x1, y1, z1, x2, y2, z2 = np.round([l1.x, l1.y, l1.z, l2.x, l2.y, l2.z], 0) + wp1.transform.location, wp2.transform.location = l1, l2 + seg_dict = dict() + seg_dict['entry'], seg_dict['exit'] = wp1, wp2 + seg_dict['entryxyz'], seg_dict['exitxyz'] = (x1, y1, z1), (x2, y2, z2) + seg_dict['path'] = [] + endloc = wp2.transform.location + if wp1.transform.location.distance(endloc) > self._sampling_resolution: + w = wp1.next(self._sampling_resolution)[0] + while w.transform.location.distance(endloc) > self._sampling_resolution: + seg_dict['path'].append(w) + w = w.next(self._sampling_resolution)[0] + else: + seg_dict['path'].append(wp1.next(self._sampling_resolution)[0]) + topology.append(seg_dict) + return topology + + def get_waypoint(self, location): + """ + The method returns waypoint at given location + + :param location: vehicle location + :return waypoint: generated waypoint close to location + """ + waypoint = self._wmap.get_waypoint(location) + return waypoint + + def get_resolution(self): + """ Accessor for self._sampling_resolution """ + return self._sampling_resolution diff --git a/python/agents/navigation/local_planner.py b/python/agents/navigation/local_planner.py new file mode 100644 index 0000000..101d382 --- /dev/null +++ b/python/agents/navigation/local_planner.py @@ -0,0 +1,328 @@ +# Copyright (c) # Copyright (c) 2018-2020 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module contains a local planner to perform low-level waypoint following based on PID controllers. """ + +from enum import Enum +from collections import deque +import random + +import carla +from agents.navigation.controller import VehiclePIDController +from agents.tools.misc import draw_waypoints + + +class RoadOption(Enum): + """ + RoadOption represents the possible topological configurations when moving from a segment of lane to other. + """ + VOID = -1 + LEFT = 1 + RIGHT = 2 + STRAIGHT = 3 + LANEFOLLOW = 4 + CHANGELANELEFT = 5 + CHANGELANERIGHT = 6 + + +class LocalPlanner(object): + """ + LocalPlanner implements the basic behavior of following a trajectory of waypoints that is generated on-the-fly. + The low-level motion of the vehicle is computed by using two PID controllers, one is used for the lateral control + and the other for the longitudinal control (cruise speed). + + When multiple paths are available (intersections) this local planner makes a random choice. + """ + + # minimum distance to target waypoint as a percentage (e.g. within 90% of + # total distance) + MIN_DISTANCE_PERCENTAGE = 0.9 + + def __init__(self, vehicle, opt_dict=None): + """ + :param vehicle: actor to apply to local planner logic onto + :param opt_dict: dictionary of arguments with the following semantics: + dt -- time difference between physics control in seconds. This is typically fixed from server side + using the arguments -benchmark -fps=F . In this case dt = 1/F + + target_speed -- desired cruise speed in Km/h + + sampling_radius -- search radius for next waypoints in seconds: e.g. 0.5 seconds ahead + + lateral_control_dict -- dictionary of arguments to setup the lateral PID controller + {'K_P':, 'K_D':, 'K_I':, 'dt'} + + longitudinal_control_dict -- dictionary of arguments to setup the longitudinal PID controller + {'K_P':, 'K_D':, 'K_I':, 'dt'} + """ + self._vehicle = vehicle + self._map = self._vehicle.get_world().get_map() + + self._dt = None + self._target_speed = None + self._sampling_radius = None + self._min_distance = None + self._current_waypoint = None + self._target_road_option = None + self._next_waypoints = None + self.target_waypoint = None + self._vehicle_controller = None + self._global_plan = None + # queue with tuples of (waypoint, RoadOption) + self._waypoints_queue = deque(maxlen=20000) + self._buffer_size = 5 + self._waypoint_buffer = deque(maxlen=self._buffer_size) + + # initializing controller + self._init_controller(opt_dict) + + def __del__(self): + if self._vehicle: + self._vehicle.destroy() + print("Destroying ego-vehicle!") + + def reset_vehicle(self): + self._vehicle = None + print("Resetting ego-vehicle!") + + def _init_controller(self, opt_dict): + """ + Controller initialization. + + :param opt_dict: dictionary of arguments. + :return: + """ + # default params + self._dt = 1.0 / 20.0 + self._target_speed = 20.0 # Km/h + self._sampling_radius = self._target_speed * 1 / 3.6 # 1 seconds horizon + self._min_distance = self._sampling_radius * self.MIN_DISTANCE_PERCENTAGE + self._max_brake = 0.3 + self._max_throt = 0.75 + self._max_steer = 0.8 + args_lateral_dict = { + 'K_P': 1.95, + 'K_D': 0.2, + 'K_I': 0.07, + 'dt': self._dt} + args_longitudinal_dict = { + 'K_P': 1.0, + 'K_D': 0, + 'K_I': 0.05, + 'dt': self._dt} + + # parameters overload + if opt_dict: + if 'dt' in opt_dict: + self._dt = opt_dict['dt'] + if 'target_speed' in opt_dict: + self._target_speed = opt_dict['target_speed'] + if 'sampling_radius' in opt_dict: + self._sampling_radius = self._target_speed * \ + opt_dict['sampling_radius'] / 3.6 + if 'lateral_control_dict' in opt_dict: + args_lateral_dict = opt_dict['lateral_control_dict'] + if 'longitudinal_control_dict' in opt_dict: + args_longitudinal_dict = opt_dict['longitudinal_control_dict'] + if 'max_throttle' in opt_dict: + self._max_throt = opt_dict['max_throttle'] + if 'max_brake' in opt_dict: + self._max_brake = opt_dict['max_brake'] + if 'max_steering' in opt_dict: + self._max_steer = opt_dict['max_steering'] + + self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location()) + self._vehicle_controller = VehiclePIDController(self._vehicle, + args_lateral=args_lateral_dict, + args_longitudinal=args_longitudinal_dict, + max_throttle=self._max_throt, + max_brake=self._max_brake, + max_steering=self._max_steer,) + + self._global_plan = False + + # compute initial waypoints + self._waypoints_queue.append((self._current_waypoint.next(self._sampling_radius)[0], RoadOption.LANEFOLLOW)) + + self._target_road_option = RoadOption.LANEFOLLOW + # fill waypoint trajectory queue + self._compute_next_waypoints(k=200) + + def set_speed(self, speed): + """ + Request new target speed. + + :param speed: new target speed in Km/h + :return: + """ + self._target_speed = speed + + def _compute_next_waypoints(self, k=1): + """ + Add new waypoints to the trajectory queue. + + :param k: how many waypoints to compute + :return: + """ + # check we do not overflow the queue + available_entries = self._waypoints_queue.maxlen - len(self._waypoints_queue) + k = min(available_entries, k) + + for _ in range(k): + last_waypoint = self._waypoints_queue[-1][0] + next_waypoints = list(last_waypoint.next(self._sampling_radius)) + + if len(next_waypoints) == 0: + break + elif len(next_waypoints) == 1: + # only one option available ==> lanefollowing + next_waypoint = next_waypoints[0] + road_option = RoadOption.LANEFOLLOW + else: + # random choice between the possible options + road_options_list = _retrieve_options( + next_waypoints, last_waypoint) + road_option = random.choice(road_options_list) + next_waypoint = next_waypoints[road_options_list.index( + road_option)] + + self._waypoints_queue.append((next_waypoint, road_option)) + + def set_global_plan(self, current_plan): + """ + Resets the waypoint queue and buffer to match the new plan. Also + sets the global_plan flag to avoid creating more waypoints + + :param current_plan: list of (carla.Waypoint, RoadOption) + :return: + """ + + # Reset the queue + self._waypoints_queue.clear() + for elem in current_plan: + self._waypoints_queue.append(elem) + self._target_road_option = RoadOption.LANEFOLLOW + + # and the buffer + self._waypoint_buffer.clear() + for _ in range(self._buffer_size): + if self._waypoints_queue: + self._waypoint_buffer.append( + self._waypoints_queue.popleft()) + else: + break + + self._global_plan = True + + def run_step(self, debug=False): + """ + Execute one step of local planning which involves running the longitudinal and lateral PID controllers to + follow the waypoints trajectory. + + :param debug: boolean flag to activate waypoints debugging + :return: control to be applied + """ + + # not enough waypoints in the horizon? => add more! + if not self._global_plan and len(self._waypoints_queue) < int(self._waypoints_queue.maxlen * 0.5): + self._compute_next_waypoints(k=100) + + if len(self._waypoints_queue) == 0 and len(self._waypoint_buffer) == 0: + control = carla.VehicleControl() + control.steer = 0.0 + control.throttle = 0.0 + control.brake = 1.0 + control.hand_brake = False + control.manual_gear_shift = False + + return control + + # Buffering the waypoints + if not self._waypoint_buffer: + for _ in range(self._buffer_size): + if self._waypoints_queue: + self._waypoint_buffer.append( + self._waypoints_queue.popleft()) + else: + break + + # current vehicle waypoint + vehicle_transform = self._vehicle.get_transform() + self._current_waypoint = self._map.get_waypoint(vehicle_transform.location) + # target waypoint + self.target_waypoint, self._target_road_option = self._waypoint_buffer[0] + # move using PID controllers + control = self._vehicle_controller.run_step(self._target_speed, self.target_waypoint) + + # purge the queue of obsolete waypoints + max_index = -1 + + for i, (waypoint, _) in enumerate(self._waypoint_buffer): + if waypoint.transform.location.distance(vehicle_transform.location) < self._min_distance: + max_index = i + if max_index >= 0: + for i in range(max_index + 1): + self._waypoint_buffer.popleft() + + if debug: + draw_waypoints(self._vehicle.get_world(), [self.target_waypoint], self._vehicle.get_location().z + 1.0) + + return control + + def done(self): + """ + Returns whether or not the planner has finished + + :return: boolean + """ + return len(self._waypoints_queue) == 0 and len(self._waypoint_buffer) == 0 + +def _retrieve_options(list_waypoints, current_waypoint): + """ + Compute the type of connection between the current active waypoint and the multiple waypoints present in + list_waypoints. The result is encoded as a list of RoadOption enums. + + :param list_waypoints: list with the possible target waypoints in case of multiple options + :param current_waypoint: current active waypoint + :return: list of RoadOption enums representing the type of connection from the active waypoint to each + candidate in list_waypoints + """ + options = [] + for next_waypoint in list_waypoints: + # this is needed because something we are linking to + # the beggining of an intersection, therefore the + # variation in angle is small + next_next_waypoint = next_waypoint.next(3.0)[0] + link = _compute_connection(current_waypoint, next_next_waypoint) + options.append(link) + + return options + + +def _compute_connection(current_waypoint, next_waypoint, threshold=35): + """ + Compute the type of topological connection between an active waypoint (current_waypoint) and a target waypoint + (next_waypoint). + + :param current_waypoint: active waypoint + :param next_waypoint: target waypoint + :return: the type of topological connection encoded as a RoadOption enum: + RoadOption.STRAIGHT + RoadOption.LEFT + RoadOption.RIGHT + """ + n = next_waypoint.transform.rotation.yaw + n = n % 360.0 + + c = current_waypoint.transform.rotation.yaw + c = c % 360.0 + + diff_angle = (n - c) % 180.0 + if diff_angle < threshold or diff_angle > (180 - threshold): + return RoadOption.STRAIGHT + elif diff_angle > 90.0: + return RoadOption.LEFT + else: + return RoadOption.RIGHT diff --git a/python/agents/navigation/local_planner_behavior.py b/python/agents/navigation/local_planner_behavior.py new file mode 100644 index 0000000..86a6d6b --- /dev/null +++ b/python/agents/navigation/local_planner_behavior.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Intel Labs. +# authors: German Ros (german.ros@intel.com) +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module contains a local planner to perform +low-level waypoint following based on PID controllers. """ + +from collections import deque +from enum import Enum + +import carla +from agents.navigation.controller import VehiclePIDController +from agents.tools.misc import distance_vehicle, draw_waypoints + + +class RoadOption(Enum): + """ + RoadOption represents the possible topological configurations + when moving from a segment of lane to other. + """ + VOID = -1 + LEFT = 1 + RIGHT = 2 + STRAIGHT = 3 + LANEFOLLOW = 4 + CHANGELANELEFT = 5 + CHANGELANERIGHT = 6 + + +class LocalPlanner(object): + """ + LocalPlanner implements the basic behavior of following a trajectory + of waypoints that is generated on-the-fly. + The low-level motion of the vehicle is computed by using two PID controllers, + one is used for the lateral control + and the other for the longitudinal control (cruise speed). + + When multiple paths are available (intersections) + this local planner makes a random choice. + """ + + # Minimum distance to target waypoint as a percentage + # (e.g. within 80% of total distance) + + # FPS used for dt + FPS = 20 + + def __init__(self, agent): + """ + :param agent: agent that regulates the vehicle + :param vehicle: actor to apply to local planner logic onto + """ + self._vehicle = agent.vehicle + self._map = agent.vehicle.get_world().get_map() + + self._target_speed = None + self.sampling_radius = None + self._min_distance = None + self._current_waypoint = None + self.target_road_option = None + self._next_waypoints = None + self.target_waypoint = None + self._vehicle_controller = None + self._global_plan = None + self._pid_controller = None + self.waypoints_queue = deque(maxlen=20000) # queue with tuples of (waypoint, RoadOption) + self._buffer_size = 5 + self._waypoint_buffer = deque(maxlen=self._buffer_size) + + self._init_controller() # initializing controller + + def reset_vehicle(self): + """Reset the ego-vehicle""" + self._vehicle = None + print("Resetting ego-vehicle!") + + def _init_controller(self): + """ + Controller initialization. + + dt -- time difference between physics control in seconds. + This is can be fixed from server side + using the arguments -benchmark -fps=F, since dt = 1/F + + target_speed -- desired cruise speed in km/h + + min_distance -- minimum distance to remove waypoint from queue + + lateral_dict -- dictionary of arguments to setup the lateral PID controller + {'K_P':, 'K_D':, 'K_I':, 'dt'} + + longitudinal_dict -- dictionary of arguments to setup the longitudinal PID controller + {'K_P':, 'K_D':, 'K_I':, 'dt'} + """ + # Default parameters + self.args_lat_hw_dict = { + 'K_P': 0.75, + 'K_D': 0.02, + 'K_I': 0.4, + 'dt': 1.0 / self.FPS} + self.args_lat_city_dict = { + 'K_P': 0.58, + 'K_D': 0.02, + 'K_I': 0.5, + 'dt': 1.0 / self.FPS} + self.args_long_hw_dict = { + 'K_P': 0.37, + 'K_D': 0.024, + 'K_I': 0.032, + 'dt': 1.0 / self.FPS} + self.args_long_city_dict = { + 'K_P': 0.15, + 'K_D': 0.05, + 'K_I': 0.07, + 'dt': 1.0 / self.FPS} + + self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location()) + + self._global_plan = False + + self._target_speed = self._vehicle.get_speed_limit() + + self._min_distance = 3 + + def set_speed(self, speed): + """ + Request new target speed. + + :param speed: new target speed in km/h + """ + + self._target_speed = speed + + def set_global_plan(self, current_plan, clean=False): + """ + Sets new global plan. + + :param current_plan: list of waypoints in the actual plan + """ + for elem in current_plan: + self.waypoints_queue.append(elem) + + if clean: + self._waypoint_buffer.clear() + for _ in range(self._buffer_size): + if self.waypoints_queue: + self._waypoint_buffer.append( + self.waypoints_queue.popleft()) + else: + break + + self._global_plan = True + + def get_incoming_waypoint_and_direction(self, steps=3): + """ + Returns direction and waypoint at a distance ahead defined by the user. + + :param steps: number of steps to get the incoming waypoint. + """ + if len(self.waypoints_queue) > steps: + return self.waypoints_queue[steps] + + else: + try: + wpt, direction = self.waypoints_queue[-1] + return wpt, direction + except IndexError as i: + print(i) + return None, RoadOption.VOID + return None, RoadOption.VOID + + def run_step(self, target_speed=None, debug=False): + """ + Execute one step of local planning which involves + running the longitudinal and lateral PID controllers to + follow the waypoints trajectory. + + :param target_speed: desired speed + :param debug: boolean flag to activate waypoints debugging + :return: control + """ + + if target_speed is not None: + self._target_speed = target_speed + else: + self._target_speed = self._vehicle.get_speed_limit() + + if len(self.waypoints_queue) == 0: + control = carla.VehicleControl() + control.steer = 0.0 + control.throttle = 0.0 + control.brake = 1.0 + control.hand_brake = False + control.manual_gear_shift = False + return control + + # Buffering the waypoints + if not self._waypoint_buffer: + for i in range(self._buffer_size): + if self.waypoints_queue: + self._waypoint_buffer.append( + self.waypoints_queue.popleft()) + else: + break + + # Current vehicle waypoint + self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location()) + + # Target waypoint + self.target_waypoint, self.target_road_option = self._waypoint_buffer[0] + + if target_speed > 50: + args_lat = self.args_lat_hw_dict + args_long = self.args_long_hw_dict + else: + args_lat = self.args_lat_city_dict + args_long = self.args_long_city_dict + + self._pid_controller = VehiclePIDController(self._vehicle, + args_lateral=args_lat, + args_longitudinal=args_long) + + control = self._pid_controller.run_step(self._target_speed, self.target_waypoint) + + # Purge the queue of obsolete waypoints + vehicle_transform = self._vehicle.get_transform() + max_index = -1 + + for i, (waypoint, _) in enumerate(self._waypoint_buffer): + if distance_vehicle( + waypoint, vehicle_transform) < self._min_distance: + max_index = i + if max_index >= 0: + for i in range(max_index + 1): + self._waypoint_buffer.popleft() + + if debug: + draw_waypoints(self._vehicle.get_world(), + [self.target_waypoint], 1.0) + return control diff --git a/python/agents/navigation/roaming_agent.py b/python/agents/navigation/roaming_agent.py new file mode 100644 index 0000000..579afa7 --- /dev/null +++ b/python/agents/navigation/roaming_agent.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Intel Labs. +# authors: German Ros (german.ros@intel.com) +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module implements an agent that roams around a track following random waypoints and avoiding other vehicles. +The agent also responds to traffic lights. """ + +from agents.navigation.agent import Agent, AgentState +from agents.navigation.local_planner import LocalPlanner + + +class RoamingAgent(Agent): + """ + RoamingAgent implements a basic agent that navigates scenes making random + choices when facing an intersection. + + This agent respects traffic lights and other vehicles. + """ + + def __init__(self, vehicle): + """ + + :param vehicle: actor to apply to local planner logic onto + """ + super(RoamingAgent, self).__init__(vehicle) + self._proximity_threshold = 10.0 # meters + self._state = AgentState.NAVIGATING + self._local_planner = LocalPlanner(self._vehicle) + + def run_step(self, debug=False): + """ + Execute one step of navigation. + :return: carla.VehicleControl + """ + + # is there an obstacle in front of us? + hazard_detected = False + + # retrieve relevant elements for safe navigation, i.e.: traffic lights + # and other vehicles + actor_list = self._world.get_actors() + vehicle_list = actor_list.filter("*vehicle*") + lights_list = actor_list.filter("*traffic_light*") + + # check possible obstacles + vehicle_state, vehicle = self._is_vehicle_hazard(vehicle_list) + if vehicle_state: + if debug: + print('!!! VEHICLE BLOCKING AHEAD [{}])'.format(vehicle.id)) + + self._state = AgentState.BLOCKED_BY_VEHICLE + hazard_detected = True + + # check for the state of the traffic lights + light_state, traffic_light = self._is_light_red(lights_list) + if light_state: + if debug: + print('=== RED LIGHT AHEAD [{}])'.format(traffic_light.id)) + + self._state = AgentState.BLOCKED_RED_LIGHT + hazard_detected = True + + if hazard_detected: + control = self.emergency_stop() + else: + self._state = AgentState.NAVIGATING + # standard local planner behavior + control = self._local_planner.run_step() + + return control diff --git a/python/agents/navigation/types_behavior.py b/python/agents/navigation/types_behavior.py new file mode 100644 index 0000000..ab3bcbe --- /dev/null +++ b/python/agents/navigation/types_behavior.py @@ -0,0 +1,40 @@ +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module contains the different parameters sets for each behavior. """ + + +class Cautious(object): + """Class for Cautious agent.""" + max_speed = 40 + speed_lim_dist = 6 + speed_decrease = 12 + safety_time = 3 + min_proximity_threshold = 12 + braking_distance = 6 + overtake_counter = -1 + tailgate_counter = 0 + + +class Normal(object): + """Class for Normal agent.""" + max_speed = 50 + speed_lim_dist = 3 + speed_decrease = 10 + safety_time = 3 + min_proximity_threshold = 10 + braking_distance = 5 + overtake_counter = 0 + tailgate_counter = 0 + + +class Aggressive(object): + """Class for Aggressive agent.""" + max_speed = 70 + speed_lim_dist = 1 + speed_decrease = 8 + safety_time = 3 + min_proximity_threshold = 8 + braking_distance = 4 + overtake_counter = 0 + tailgate_counter = -1 diff --git a/python/agents/tools/__init__.py b/python/agents/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/agents/tools/misc.py b/python/agents/tools/misc.py new file mode 100644 index 0000000..f9c3d84 --- /dev/null +++ b/python/agents/tools/misc.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Intel Labs. +# authors: German Ros (german.ros@intel.com) +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" Module with auxiliary functions. """ + +import math +import numpy as np +import carla + +def draw_waypoints(world, waypoints, z=0.5): + """ + Draw a list of waypoints at a certain height given in z. + + :param world: carla.world object + :param waypoints: list or iterable container with the waypoints to draw + :param z: height in meters + """ + for wpt in waypoints: + wpt_t = wpt.transform + begin = wpt_t.location + carla.Location(z=z) + angle = math.radians(wpt_t.rotation.yaw) + end = begin + carla.Location(x=math.cos(angle), y=math.sin(angle)) + world.debug.draw_arrow(begin, end, arrow_size=0.3, life_time=1.0) + + +def get_speed(vehicle): + """ + Compute speed of a vehicle in Km/h. + + :param vehicle: the vehicle for which speed is calculated + :return: speed as a float in Km/h + """ + vel = vehicle.get_velocity() + + return 3.6 * math.sqrt(vel.x ** 2 + vel.y ** 2 + vel.z ** 2) + +def is_within_distance_ahead(target_transform, current_transform, max_distance): + """ + Check if a target object is within a certain distance in front of a reference object. + + :param target_transform: location of the target object + :param current_transform: location of the reference object + :param orientation: orientation of the reference object + :param max_distance: maximum allowed distance + :return: True if target object is within max_distance ahead of the reference object + """ + target_vector = np.array([target_transform.location.x - current_transform.location.x, target_transform.location.y - current_transform.location.y]) + norm_target = np.linalg.norm(target_vector) + + # If the vector is too short, we can simply stop here + if norm_target < 0.001: + return True + + if norm_target > max_distance: + return False + + fwd = current_transform.get_forward_vector() + forward_vector = np.array([fwd.x, fwd.y]) + d_angle = math.degrees(math.acos(np.clip(np.dot(forward_vector, target_vector) / norm_target, -1., 1.))) + + return d_angle < 90.0 + +def is_within_distance(target_location, current_location, orientation, max_distance, d_angle_th_up, d_angle_th_low=0): + """ + Check if a target object is within a certain distance from a reference object. + A vehicle in front would be something around 0 deg, while one behind around 180 deg. + + :param target_location: location of the target object + :param current_location: location of the reference object + :param orientation: orientation of the reference object + :param max_distance: maximum allowed distance + :param d_angle_th_up: upper thereshold for angle + :param d_angle_th_low: low thereshold for angle (optional, default is 0) + :return: True if target object is within max_distance ahead of the reference object + """ + target_vector = np.array([target_location.x - current_location.x, target_location.y - current_location.y]) + norm_target = np.linalg.norm(target_vector) + + # If the vector is too short, we can simply stop here + if norm_target < 0.001: + return True + + if norm_target > max_distance: + return False + + forward_vector = np.array( + [math.cos(math.radians(orientation)), math.sin(math.radians(orientation))]) + d_angle = math.degrees(math.acos(np.clip(np.dot(forward_vector, target_vector) / norm_target, -1., 1.))) + + return d_angle_th_low < d_angle < d_angle_th_up + + +def compute_magnitude_angle(target_location, current_location, orientation): + """ + Compute relative angle and distance between a target_location and a current_location + + :param target_location: location of the target object + :param current_location: location of the reference object + :param orientation: orientation of the reference object + :return: a tuple composed by the distance to the object and the angle between both objects + """ + target_vector = np.array([target_location.x - current_location.x, target_location.y - current_location.y]) + norm_target = np.linalg.norm(target_vector) + + forward_vector = np.array([math.cos(math.radians(orientation)), math.sin(math.radians(orientation))]) + d_angle = math.degrees(math.acos(np.clip(np.dot(forward_vector, target_vector) / norm_target, -1., 1.))) + + return (norm_target, d_angle) + + +def distance_vehicle(waypoint, vehicle_transform): + """ + Returns the 2D distance from a waypoint to a vehicle + + :param waypoint: actual waypoint + :param vehicle_transform: transform of the target vehicle + """ + loc = vehicle_transform.location + x = waypoint.transform.location.x - loc.x + y = waypoint.transform.location.y - loc.y + + return math.sqrt(x * x + y * y) + + +def vector(location_1, location_2): + """ + Returns the unit vector from location_1 to location_2 + + :param location_1, location_2: carla.Location objects + """ + x = location_2.x - location_1.x + y = location_2.y - location_1.y + z = location_2.z - location_1.z + norm = np.linalg.norm([x, y, z]) + np.finfo(float).eps + + return [x / norm, y / norm, z / norm] + + +def compute_distance(location_1, location_2): + """ + Euclidean distance between 3D points + + :param location_1, location_2: 3D points + """ + x = location_2.x - location_1.x + y = location_2.y - location_1.y + z = location_2.z - location_1.z + norm = np.linalg.norm([x, y, z]) + np.finfo(float).eps + return norm + + +def positive(num): + """ + Return the given number if positive, else 0 + + :param num: value to check + """ + return num if num > 0.0 else 0.0 diff --git a/python/automatic_control_revised.py b/python/automatic_control_revised.py new file mode 100755 index 0000000..e623d5e --- /dev/null +++ b/python/automatic_control_revised.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +"""Revised automatic control +""" +# Author: Runsheng Xu +# License: MIT + +import os +import random +import sys + +import carla + +from python.agents.navigation.behavior_agent import BehaviorAgent + + +def main(): + try: + client = carla.Client('localhost', 2000) + client.set_timeout(2.0) + + # Retrieve the world that is currently running + world = client.get_world() + + origin_settings = world.get_settings() + + # set sync mode + settings = world.get_settings() + settings.synchronous_mode = True + settings.fixed_delta_seconds = 0.05 + world.apply_settings(settings) + + blueprint_library = world.get_blueprint_library() + + # read all valid spawn points + all_default_spawn = world.get_map().get_spawn_points() + # randomly choose one as the start point + spawn_point = random.choice(all_default_spawn) if all_default_spawn else carla.Transform() + + # create the blueprint library + ego_vehicle_bp = blueprint_library.find('vehicle.lincoln.mkz2017') + ego_vehicle_bp.set_attribute('color', '0, 0, 0') + # spawn the vehicle + vehicle = world.spawn_actor(ego_vehicle_bp, spawn_point) + + # we need to tick the world once to let the client update the spawn position + world.tick() + + # create the behavior agent + agent = BehaviorAgent(vehicle, behavior='normal') + + # set the destination spot + spawn_points = world.get_map().get_spawn_points() + random.shuffle(spawn_points) + + # to avoid the destination and start position same + if spawn_points[0].location != agent.vehicle.get_location(): + destination = spawn_points[0] + else: + destination = spawn_points[1] + + # generate the route + agent.set_destination(agent.vehicle.get_location(), destination.location, clean=True) + + while True: + agent.update_information(vehicle) + + world.tick() + # top view + spectator = world.get_spectator() + transform = vehicle.get_transform() + spectator.set_transform(carla.Transform(transform.location + carla.Location(z=40), + carla.Rotation(pitch=-90))) + + speed_limit = vehicle.get_speed_limit() + agent.get_local_planner().set_speed(speed_limit) + + control = agent.run_step(debug=True) + vehicle.apply_control(control) + + finally: + world.apply_settings(origin_settings) + vehicle.destroy() + + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print(' - Exited by user.')