diff --git a/DATASETS.md b/DATASETS.md index 3825e0b..23c9c3a 100644 --- a/DATASETS.md +++ b/DATASETS.md @@ -1,7 +1,7 @@ # Supported Datasets and Required Formats ## nuScenes -Nothing special needs to be done for the nuScenes dataset, simply install it as per [the instructions in the devkit README](https://github.com/nutonomy/nuscenes-devkit#nuscenes-setup). +Nothing special needs to be done for the nuScenes dataset, simply download it as per [the instructions in the devkit README](https://github.com/nutonomy/nuscenes-devkit#nuscenes-setup). It should look like this after downloading: ``` @@ -54,8 +54,30 @@ It should look like this after downloading: **Note**: Not all dataset splits need to be downloaded. For example, you can download only the nuPlan Mini Split in case you only need a small sample dataset. +## Waymo Open Motion Dataset +Nothing special needs to be done for the Waymo Open Motion Dataset, simply download v1.1 as per [the instructions on the dataset website](https://waymo.com/intl/en_us/open/download/). + +It should look like this after downloading: +``` +/path/to/waymo/ + ├── training/ + | ├── training.tfrecord-00000-of-01000 + | ├── training.tfrecord-00001-of-01000 + | └── ... + ├── validation/ + │   ├── validation.tfrecord-00000-of-00150 + | ├── validation.tfrecord-00001-of-00150 + | └── ... + └── testing/ + ├── testing.tfrecord-00000-of-00150 + ├── testing.tfrecord-00001-of-00150 + └── ... +``` + +**Note**: Not all the dataset parts need to be downloaded, only the necessary directories in [the Google Cloud Bucket](https://console.cloud.google.com/storage/browser/waymo_open_dataset_motion_v_1_1_0/uncompressed/scenario) need to be downloaded (e.g., `validation` for the validation dataset). + ## Lyft Level 5 -Nothing special needs to be done for the Lyft Level 5 dataset, simply install it as per [the instructions on the dataset website](https://woven-planet.github.io/l5kit/dataset.html). +Nothing special needs to be done for the Lyft Level 5 dataset, simply download it as per [the instructions on the dataset website](https://woven-planet.github.io/l5kit/dataset.html). It should look like this after downloading: ``` diff --git a/README.md b/README.md index 53d4c77..aecffd2 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ The easiest way to install trajdata is through PyPI with pip install trajdata ``` -In case you would also like to use datasets such as nuScenes and Lyft Level 5 (which require their own devkits to access raw data), the following will also install the respective devkits. +In case you would also like to use datasets such as nuScenes, Lyft Level 5, or Waymo Open Motion Dataset (which require their own devkits to access raw data), the following will also install the respective devkits. ```sh # For nuScenes pip install "trajdata[nusc]" @@ -21,8 +21,11 @@ pip install "trajdata[nusc]" # For Lyft pip install "trajdata[lyft]" -# Both -pip install "trajdata[nusc,lyft]" +# For Waymo +pip install "trajdata[waymo]" + +# All +pip install "trajdata[nusc,lyft,waymo]" ``` Then, download the raw datasets (nuScenes, Lyft Level 5, ETH/UCY, etc) in case you do not already have them. For more information about how to structure dataset folders/files, please see [`DATASETS.md`](./DATASETS.md). @@ -84,9 +87,12 @@ Currently, the dataloader supports interfacing with the following datasets: | Dataset | ID | Splits | Add'l Tags | Description | dt | Maps | |---------|----|--------|------------|-------------|----|------| | nuScenes Train/TrainVal/Val | `nusc_trainval` | `train`, `train_val`, `val` | `boston`, `singapore` | nuScenes prediction challenge training/validation/test splits (500/200/150 scenes) | 0.5s (2Hz) | :white_check_mark: | -| nuScenes Test | `nusc_test` | `test` | `boston`, `singapore` | nuScenes' test split, no annotations (150 scenes) | 0.5s (2Hz) | :white_check_mark: | +| nuScenes Test | `nusc_test` | `test` | `boston`, `singapore` | nuScenes test split, no annotations (150 scenes) | 0.5s (2Hz) | :white_check_mark: | | nuScenes Mini | `nusc_mini` | `mini_train`, `mini_val` | `boston`, `singapore` | nuScenes mini training/validation splits (8/2 scenes) | 0.5s (2Hz) | :white_check_mark: | | nuPlan Mini | `nuplan_mini` | `mini_train`, `mini_val`, `mini_test` | `boston`, `singapore`, `pittsburgh`, `las_vegas` | nuPlan mini training/validation/test splits (942/197/224 scenes) | 0.05s (20Hz) | :white_check_mark: | +| Waymo Open Motion Training | `waymo_train` | `train` | N/A | Waymo Open Motion Dataset `training` split | 0.1s (10Hz) | :white_check_mark: | +| Waymo Open Motion Validation | `waymo_val` | `val` | N/A | Waymo Open Motion Dataset `validation` split | 0.1s (10Hz) | :white_check_mark: | +| Waymo Open Motion Testing | `waymo_test` | `test` | N/A | Waymo Open Motion Dataset `testing` split | 0.1s (10Hz) | :white_check_mark: | | Lyft Level 5 Train | `lyft_train` | `train` | `palo_alto` | Lyft Level 5 training data - part 1/2 (8.4 GB) | 0.1s (10Hz) | :white_check_mark: | | Lyft Level 5 Train Full | `lyft_train_full` | `train` | `palo_alto` | Lyft Level 5 training data - part 2/2 (70 GB) | 0.1s (10Hz) | :white_check_mark: | | Lyft Level 5 Validation | `lyft_val` | `val` | `palo_alto` | Lyft Level 5 validation data (8.2 GB) | 0.1s (10Hz) | :white_check_mark: | diff --git a/examples/map_api_example.py b/examples/map_api_example.py index 02ff0f2..759b2e0 100644 --- a/examples/map_api_example.py +++ b/examples/map_api_example.py @@ -162,7 +162,7 @@ def main(): ax.imshow(map_img, alpha=0.5, origin="lower") vec_map.visualize_lane_graph( origin_lane=np.random.randint(0, len(vec_map.lanes)), - num_hops=10, + num_hops=5, raster_from_world=raster_from_world, ax=ax, ) diff --git a/requirements.txt b/requirements.txt index 5d63548..3f40524 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,6 +17,11 @@ nuscenes-devkit==1.1.9 protobuf==3.19.4 l5kit==1.5.0 +# Waymo devkit +tensorflow==2.11.0 +waymo-open-dataset-tf-2-11-0 +intervaltree + # Development black isort diff --git a/setup.cfg b/setup.cfg index 9eb0525..d07f6d6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = trajdata -version = 1.3.0 +version = 1.3.1 author = Boris Ivanovic author_email = bivanovic@nvidia.com description = A unified interface to many trajectory forecasting datasets. @@ -48,3 +48,7 @@ nusc = lyft = protobuf==3.19.4 l5kit==1.5.0 +waymo = + tensorflow==2.11.0 + waymo-open-dataset-tf-2-11-0 + intervaltree diff --git a/src/trajdata/caching/df_cache.py b/src/trajdata/caching/df_cache.py index a21135d..24b8d3a 100644 --- a/src/trajdata/caching/df_cache.py +++ b/src/trajdata/caching/df_cache.py @@ -757,7 +757,7 @@ def finalize_and_cache_map( cache_path, vector_map.env_name, vector_map.map_name, raster_resolution ) - pbar_kwargs = {"position": 2, "leave": False} + pbar_kwargs = {"position": 2, "leave": False, "disable": True} rasterized_map: RasterizedMap = raster_utils.rasterize_map( vector_map, raster_resolution, **pbar_kwargs ) diff --git a/src/trajdata/dataset.py b/src/trajdata/dataset.py index ed0c64f..087240f 100644 --- a/src/trajdata/dataset.py +++ b/src/trajdata/dataset.py @@ -46,7 +46,6 @@ scene_collate_fn, ) from trajdata.dataset_specific import RawDataset -from trajdata.maps import VectorMap from trajdata.maps.map_api import MapAPI from trajdata.parallel import ParallelDatasetPreprocessor, scene_paths_collate_fn from trajdata.utils import agent_utils, env_utils, scene_utils, string_utils @@ -187,7 +186,8 @@ def __init__( self.raster_map_params = ( raster_map_params if raster_map_params is not None - else {"px_per_m": DEFAULT_PX_PER_M} + # Allowing for parallel map processing in case the user specifies num_workers. + else {"px_per_m": DEFAULT_PX_PER_M, "num_workers": num_workers} ) self.incl_vector_map = incl_vector_map self.vector_map_params = ( diff --git a/src/trajdata/dataset_specific/scene_records.py b/src/trajdata/dataset_specific/scene_records.py index 72b7d84..876bff8 100644 --- a/src/trajdata/dataset_specific/scene_records.py +++ b/src/trajdata/dataset_specific/scene_records.py @@ -23,6 +23,12 @@ class LyftSceneRecord(NamedTuple): data_idx: int +class WaymoSceneRecord(NamedTuple): + name: str + length: str + data_idx: int + + class NuPlanSceneRecord(NamedTuple): name: str location: str diff --git a/src/trajdata/dataset_specific/waymo/__init__.py b/src/trajdata/dataset_specific/waymo/__init__.py new file mode 100644 index 0000000..4a43e59 --- /dev/null +++ b/src/trajdata/dataset_specific/waymo/__init__.py @@ -0,0 +1 @@ +from .waymo_dataset import WaymoDataset diff --git a/src/trajdata/dataset_specific/waymo/waymo_dataset.py b/src/trajdata/dataset_specific/waymo/waymo_dataset.py new file mode 100644 index 0000000..47ebc4d --- /dev/null +++ b/src/trajdata/dataset_specific/waymo/waymo_dataset.py @@ -0,0 +1,373 @@ +import os +from collections import defaultdict +from functools import partial +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Type + +import numpy as np +import pandas as pd +import tensorflow as tf +import tqdm +from waymo_open_dataset.protos.scenario_pb2 import Scenario + +from trajdata.caching import EnvCache, SceneCache +from trajdata.data_structures import ( + AgentMetadata, + EnvMetadata, + Scene, + SceneMetadata, + SceneTag, +) +from trajdata.data_structures.agent import ( + Agent, + AgentMetadata, + AgentType, + FixedExtent, + VariableExtent, +) +from trajdata.data_structures.scene_tag import SceneTag +from trajdata.dataset_specific.raw_dataset import RawDataset +from trajdata.dataset_specific.scene_records import WaymoSceneRecord +from trajdata.dataset_specific.waymo import waymo_utils +from trajdata.dataset_specific.waymo.waymo_utils import ( + WaymoScenarios, + interpolate_array, + translate_agent_type, +) +from trajdata.maps import VectorMap +from trajdata.proto.vectorized_map_pb2 import ( + MapElement, + PedCrosswalk, + RoadLane, + VectorizedMap, +) +from trajdata.utils import arr_utils +from trajdata.utils.parallel_utils import parallel_apply + + +def const_lambda(const_val: Any) -> Any: + return const_val + + +class WaymoDataset(RawDataset): + def compute_metadata(self, env_name: str, data_dir: str) -> EnvMetadata: + if env_name == "waymo_train": + # Waymo possibilities are the Cartesian product of these + dataset_parts = [("train",)] + scene_split_map = defaultdict(partial(const_lambda, const_val="train")) + + elif env_name == "waymo_val": + # Waymo possibilities are the Cartesian product of these + dataset_parts = [("val",)] + scene_split_map = defaultdict(partial(const_lambda, const_val="val")) + + elif env_name == "waymo_test": + # Waymo possibilities are the Cartesian product of these + dataset_parts = [("test",)] + scene_split_map = defaultdict(partial(const_lambda, const_val="test")) + + return EnvMetadata( + name=env_name, + data_dir=data_dir, + dt=waymo_utils.WAYMO_DT, + parts=dataset_parts, + scene_split_map=scene_split_map, + ) + + def load_dataset_obj(self, verbose: bool = False) -> None: + if verbose: + print(f"Loading {self.name} dataset...", flush=True) + dataset_name: str = "" + if self.name == "waymo_train": + dataset_name = "training" + elif self.name == "waymo_val": + dataset_name = "validation" + elif self.name == "waymo_test": + dataset_name = "testing" + self.dataset_obj = WaymoScenarios( + dataset_name=dataset_name, source_dir=self.metadata.data_dir + ) + + def _get_matching_scenes_from_obj( + self, + scene_tag: SceneTag, + scene_desc_contains: Optional[List[str]], + env_cache: EnvCache, + ) -> List[SceneMetadata]: + all_scenes_list: List[WaymoSceneRecord] = list() + + scenes_list: List[SceneMetadata] = list() + for idx in range(self.dataset_obj.num_scenarios): + scene_name: str = "scene_" + str(idx) + scene_split: str = self.metadata.scene_split_map[scene_name] + scene_length: int = self.dataset_obj.scene_length + + # Saving all scene records for later caching. + all_scenes_list.append(WaymoSceneRecord(scene_name, str(scene_length), idx)) + + if scene_split in scene_tag and scene_desc_contains is None: + scene_metadata = SceneMetadata( + env_name=self.metadata.name, + name=scene_name, + dt=self.metadata.dt, + raw_data_idx=idx, + ) + scenes_list.append(scene_metadata) + + self.cache_all_scenes_list(env_cache, all_scenes_list) + return scenes_list + + def get_scene(self, scene_info: SceneMetadata) -> Scene: + _, name, _, data_idx = scene_info + scene_name: str = name + scene_split: str = self.metadata.scene_split_map[scene_name] + scene_length: int = self.dataset_obj.scene_length + + return Scene( + self.metadata, + scene_name, + f"{self.name}_{data_idx}", + scene_split, + scene_length, + data_idx, + None, + ) + + def _get_matching_scenes_from_cache( + self, + scene_tag: SceneTag, + scene_desc_contains: Optional[List[str]], + env_cache: EnvCache, + ) -> List[Scene]: + all_scenes_list: List[WaymoSceneRecord] = env_cache.load_env_scenes_list( + self.name + ) + + scenes_list: List[SceneMetadata] = list() + for scene_record in all_scenes_list: + scene_name, scene_length, data_idx = scene_record + scene_split: str = self.metadata.scene_split_map[scene_name] + + if scene_split in scene_tag and scene_desc_contains is None: + scene_metadata = Scene( + self.metadata, + scene_name, + # Unfortunately necessary as Waymo does not + # associate each scenario with a location. + f"{self.name}_{data_idx}", + scene_split, + scene_length, + data_idx, + None, # This isn't used if everything is already cached. + ) + scenes_list.append(scene_metadata) + + return scenes_list + + def get_agent_info( + self, scene: Scene, cache_path: Path, cache_class: Type[SceneCache] + ) -> Tuple[List[AgentMetadata], List[List[AgentMetadata]]]: + agent_list: List[AgentMetadata] = [] + agent_presence: List[List[AgentMetadata]] = [ + [] for _ in range(scene.length_timesteps) + ] + + dataset = tf.data.TFRecordDataset( + [self.dataset_obj.get_filename(scene.raw_data_idx)], compression_type="" + ) + scenario: Scenario = Scenario() + for data in dataset: + scenario.ParseFromString(bytearray(data.numpy())) + break + + agent_ids = [] + # agent_ml_class = [] + all_agent_data = [] + agents_to_remove = [] + ego_id = None + for index, track in enumerate(scenario.tracks): + agent_type: AgentType = translate_agent_type(track.object_type) + if agent_type == -1: + continue + + agent_id: int = track.id + agent_ids.append(agent_id) + + # agent_ml_class.append(agent_type) + states = track.states + translations = [] + velocities = [] + sizes = [] + yaws = [] + for state in states: + if state.valid: + translations.append( + (state.center_x, state.center_y, state.center_z) + ) + velocities.append((state.velocity_x, state.velocity_y)) + sizes.append((state.length, state.width, state.height)) + yaws.append(state.heading) + else: + translations.append((np.nan, np.nan, np.nan)) + velocities.append((np.nan, np.nan)) + sizes.append((np.nan, np.nan, np.nan)) + yaws.append(np.nan) + + curr_agent_data = np.concatenate( + ( + translations, + velocities, + np.expand_dims(yaws, axis=1), + sizes, + ), + axis=1, + ) + + curr_agent_data = interpolate_array(curr_agent_data) + all_agent_data.append(curr_agent_data) + + first_timestep = pd.Series(curr_agent_data[:, 0]).first_valid_index() + last_timestep = pd.Series(curr_agent_data[:, 0]).last_valid_index() + if first_timestep is None or last_timestep is None: + first_timestep = 0 + last_timestep = 0 + + agent_name = str(agent_id) + if index == scenario.sdc_track_index: + ego_id = agent_id + agent_name = "ego" + + agent_info = AgentMetadata( + name=agent_name, + agent_type=agent_type, + first_timestep=first_timestep, + last_timestep=last_timestep, + extent=VariableExtent(), + ) + if last_timestep - first_timestep > 0: + agent_list.append(agent_info) + for timestep in range(first_timestep, last_timestep + 1): + agent_presence[timestep].append(agent_info) + else: + agents_to_remove.append(agent_id) + + # agent_ml_class = np.repeat(agent_ml_class, scene.length_timesteps) + # all_agent_data = np.insert(all_agent_data, 6, agent_ml_class, axis=1) + agent_ids = np.repeat(agent_ids, scene.length_timesteps) + traj_cols = ["x", "y", "z", "vx", "vy", "heading"] + # class_cols = ["class_id"] + extent_cols = ["length", "width", "height"] + agent_frame_ids = np.resize( + np.arange(scene.length_timesteps), + len(scenario.tracks) * scene.length_timesteps, + ) + + all_agent_data_df = pd.DataFrame( + np.concatenate(all_agent_data), + columns=traj_cols + extent_cols, + index=[agent_ids, agent_frame_ids], + ) + + all_agent_data_df.index.names = ["agent_id", "scene_ts"] + + # This does exactly the same as dropna(...), but we're keeping the mask around + # for later use with agent_ids. + mask = pd.notna(all_agent_data_df).all(axis=1, bool_only=False) + all_agent_data_df = all_agent_data_df.loc[mask] + + all_agent_data_df.sort_index(inplace=True) + all_agent_data_df.reset_index(level=1, inplace=True) + + all_agent_data_df[["ax", "ay"]] = ( + arr_utils.agent_aware_diff( + all_agent_data_df[["vx", "vy"]].to_numpy(), agent_ids[mask] + ) + / waymo_utils.WAYMO_DT + ) + final_cols = [ + "x", + "y", + "z", + "vx", + "vy", + "ax", + "ay", + "heading", + ] + extent_cols + + # Removing agents with only one detection. + all_agent_data_df.drop(index=agents_to_remove, inplace=True) + + # Changing the agent_id dtype to str + all_agent_data_df.reset_index(inplace=True) + all_agent_data_df["agent_id"] = all_agent_data_df["agent_id"].astype(str) + all_agent_data_df.set_index(["agent_id", "scene_ts"], inplace=True) + all_agent_data_df.rename( + index={str(ego_id): "ego"}, inplace=True, level="agent_id" + ) + + cache_class.save_agent_data( + all_agent_data_df.loc[:, final_cols], + cache_path, + scene, + ) + + tls_dict = waymo_utils.extract_traffic_lights( + dynamic_states=scenario.dynamic_map_states + ) + tls_df = pd.DataFrame( + tls_dict.values(), + index=pd.MultiIndex.from_tuples( + tls_dict.keys(), names=["lane_id", "scene_ts"] + ), + columns=["status"], + ) + cache_class.save_traffic_light_data(tls_df, cache_path, scene) + + return agent_list, agent_presence + + def cache_map( + self, + data_idx: int, + cache_path: Path, + map_cache_class: Type[SceneCache], + map_params: Dict[str, Any], + ): + dataset = tf.data.TFRecordDataset( + [self.dataset_obj.get_filename(data_idx)], compression_type="" + ) + + scenario: Scenario = Scenario() + for data in dataset: + scenario.ParseFromString(bytearray(data.numpy())) + break + + vector_map: VectorMap = waymo_utils.extract_vectorized( + map_features=scenario.map_features, + map_name=f"{self.name}:{self.name}_{data_idx}", + ) + + map_cache_class.finalize_and_cache_map(cache_path, vector_map, map_params) + + def cache_maps( + self, + cache_path: Path, + map_cache_class: Type[SceneCache], + map_params: Dict[str, Any], + ) -> None: + num_workers: int = map_params.get("num_workers", 0) + if num_workers > 1: + parallel_apply( + partial( + self.cache_map, + cache_path=cache_path, + map_cache_class=map_cache_class, + map_params=map_params, + ), + range(self.dataset_obj.num_scenarios), + num_workers=num_workers, + ) + + else: + for i in tqdm.trange(self.dataset_obj.num_scenarios): + self.cache_map(i, cache_path, map_cache_class, map_params) diff --git a/src/trajdata/dataset_specific/waymo/waymo_utils.py b/src/trajdata/dataset_specific/waymo/waymo_utils.py new file mode 100644 index 0000000..83cc210 --- /dev/null +++ b/src/trajdata/dataset_specific/waymo/waymo_utils.py @@ -0,0 +1,566 @@ +import os +from pathlib import Path +from subprocess import check_call, check_output +from typing import Dict, Final, List, Optional, Tuple + +import numpy as np +import pandas as pd +import tensorflow as tf +from intervaltree import Interval, IntervalTree +from tqdm import tqdm +from waymo_open_dataset.protos import map_pb2 as waymo_map_pb2 +from waymo_open_dataset.protos import scenario_pb2 + +from trajdata.maps import TrafficLightStatus, VectorMap +from trajdata.maps.vec_map_elements import PedCrosswalk, Polyline, RoadLane + +WAYMO_DT: Final[float] = 0.1 +WAYMO_DATASET_NAMES = [ + "testing", + "testing_interactive", + "training", + "training_20s", + "validation", + "validation_interactive", +] + +TRAIN_SCENE_LENGTH = 91 +VAL_SCENE_LENGTH = 91 +TEST_SCENE_LENGTH = 11 +TRAIN_20S_SCENE_LENGTH = 201 + +GREEN = [ + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_ARROW_GO, + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_GO, +] +RED = [ + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_ARROW_CAUTION, + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_ARROW_STOP, + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_STOP, + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_CAUTION, + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_FLASHING_STOP, + waymo_map_pb2.TrafficSignalLaneState.State.LANE_STATE_FLASHING_CAUTION, +] + +from trajdata.data_structures.agent import ( + Agent, + AgentMetadata, + AgentType, + FixedExtent, + VariableExtent, +) + + +class WaymoScenarios: + def __init__( + self, + dataset_name: str, + source_dir: Path, + download: bool = False, + split: bool = False, + ): + if dataset_name not in WAYMO_DATASET_NAMES: + raise RuntimeError( + "Wrong dataset name. Please choose name from " + + str(WAYMO_DATASET_NAMES) + ) + + self.name = dataset_name + self.source_dir = source_dir + if dataset_name in ["training"]: + self.scene_length = TRAIN_SCENE_LENGTH + elif dataset_name in ["validation", "validation_interactive"]: + self.scene_length = VAL_SCENE_LENGTH + elif dataset_name in ["testing", "testing_interactive"]: + self.scene_length = TEST_SCENE_LENGTH + elif dataset_name in ["training_20s"]: + self.scene_length = TRAIN_20S_SCENE_LENGTH + + if download: + self.download_dataset() + + split_path = self.source_dir / (self.name + "_splitted") + if split or not split_path.is_dir(): + self.split_scenarios() + else: + self.num_scenarios = len(os.listdir(split_path)) + + def download_dataset(self) -> None: + # check_call("snap install google-cloud-sdk --classic".split()) + gsutil = check_output(["which", "gsutil"]) + download_cmd = ( + str(gsutil.decode("utf-8")) + + "-m cp -r gs://waymo_open_dataset_motion_v_1_1_0/uncompressed/scenario/" + + str(self.name) + + " " + + str(self.source_dir) + ).split() + check_call(download_cmd) + + def split_scenarios( + self, num_parallel_reads: int = 20, verbose: bool = True + ) -> None: + source_it: Path = (self.source_dir / self.name).glob("*") + file_names: List[str] = [str(file_name) for file_name in source_it] + if verbose: + print("Loading tfrecord files...") + dataset = tf.data.TFRecordDataset( + file_names, compression_type="", num_parallel_reads=num_parallel_reads + ) + + if verbose: + print("Splitting tfrecords...") + + splitted_dir: Path = self.source_dir / f"{self.name}_splitted" + if not splitted_dir.exists(): + splitted_dir.mkdir(parents=True) + + scenario_num: int = 0 + for data in tqdm(dataset): + file_name: Path = ( + splitted_dir / f"{self.name}_splitted_{scenario_num}.tfrecords" + ) + with tf.io.TFRecordWriter(str(file_name)) as file_writer: + file_writer.write(data.numpy()) + + scenario_num += 1 + + self.num_scenarios = scenario_num + if verbose: + print( + str(self.num_scenarios) + + " scenarios from " + + str(len(file_names)) + + " file(s) have been split into " + + str(self.num_scenarios) + + " files." + ) + + def get_filename(self, data_idx): + return ( + self.source_dir + / f"{self.name}_splitted" + / f"{self.name}_splitted_{data_idx}.tfrecords" + ) + + +def extract_vectorized( + map_features: List[waymo_map_pb2.MapFeature], map_name: str, verbose: bool = False +) -> VectorMap: + vec_map = VectorMap(map_id=map_name) + + max_pt = np.array([np.nan, np.nan, np.nan]) + min_pt = np.array([np.nan, np.nan, np.nan]) + + boundaries: Dict[int, Polyline] = {} + for map_feature in tqdm( + map_features, desc="Extracting road boundaries", disable=not verbose + ): + if map_feature.WhichOneof("feature_data") == "road_line": + boundaries[map_feature.id] = Polyline( + np.array([(pt.x, pt.y, pt.z) for pt in map_feature.road_line.polyline]) + ) + elif map_feature.WhichOneof("feature_data") == "road_edge": + boundaries[map_feature.id] = Polyline( + np.array([(pt.x, pt.y, pt.z) for pt in map_feature.road_edge.polyline]) + ) + + lane_id_remap_dict = {} + for map_feature in tqdm( + map_features, desc="Extracting map elements", disable=not verbose + ): + if map_feature.WhichOneof("feature_data") == "lane": + if len(map_feature.lane.polyline) == 1: + # TODO: Why does Waymo have single-point polylines that + # aren't interpolating between others?? + continue + + road_lanes, modified_lane_ids = translate_lane(map_feature, boundaries) + if modified_lane_ids: + lane_id_remap_dict.update(modified_lane_ids) + + for road_lane in road_lanes: + vec_map.add_map_element(road_lane) + + max_pt = np.fmax(max_pt, road_lane.center.xyz.max(axis=0)) + min_pt = np.fmin(min_pt, road_lane.center.xyz.min(axis=0)) + + if road_lane.left_edge: + max_pt = np.fmax(max_pt, road_lane.left_edge.xyz.max(axis=0)) + min_pt = np.fmin(min_pt, road_lane.left_edge.xyz.min(axis=0)) + + if road_lane.right_edge: + max_pt = np.fmax(max_pt, road_lane.right_edge.xyz.max(axis=0)) + min_pt = np.fmin(min_pt, road_lane.right_edge.xyz.min(axis=0)) + + elif map_feature.WhichOneof("feature_data") == "crosswalk": + crosswalk = PedCrosswalk( + id=str(map_feature.id), + polygon=Polyline( + np.array( + [(pt.x, pt.y, pt.z) for pt in map_feature.crosswalk.polygon] + ) + ), + ) + vec_map.add_map_element(crosswalk) + + max_pt = np.fmax(max_pt, crosswalk.polygon.xyz.max(axis=0)) + min_pt = np.fmin(min_pt, crosswalk.polygon.xyz.min(axis=0)) + + else: + continue + + for elem in vec_map.iter_elems(): + if not isinstance(elem, RoadLane): + continue + + to_remove = set() + to_add = set() + for l_id in elem.adj_lanes_left: + if l_id in lane_id_remap_dict: + # Remove the original lanes, replace them with our chunked versions. + to_remove.add(l_id) + to_add.update(lane_id_remap_dict[l_id]) + + elem.adj_lanes_left -= to_remove + elem.adj_lanes_left |= to_add + + to_remove = set() + to_add = set() + for l_id in elem.adj_lanes_right: + if l_id in lane_id_remap_dict: + # Remove the original lanes, replace them with our chunked versions. + to_remove.add(l_id) + to_add.update(lane_id_remap_dict[l_id]) + + elem.adj_lanes_right -= to_remove + elem.adj_lanes_right |= to_add + + to_remove = set() + to_add = set() + for l_id in elem.prev_lanes: + if l_id in lane_id_remap_dict: + # Remove the original prev lanes, replace them with + # the tail of our equivalent chunked version. + to_remove.add(l_id) + to_add.add(lane_id_remap_dict[l_id][-1]) + + elem.prev_lanes -= to_remove + elem.prev_lanes |= to_add + + to_remove = set() + to_add = set() + for l_id in elem.next_lanes: + if l_id in lane_id_remap_dict: + # Remove the original prev lanes, replace them with + # the first of our equivalent chunked version. + to_remove.add(l_id) + to_add.add(lane_id_remap_dict[l_id][0]) + + elem.next_lanes -= to_remove + elem.next_lanes |= to_add + + # Setting the map bounds. + # vec_map.extent is [min_x, min_y, min_z, max_x, max_y, max_z] + vec_map.extent = np.concatenate((min_pt, max_pt)) + + return vec_map + + +def translate_agent_type(agent_type): + if agent_type == scenario_pb2.Track.ObjectType.TYPE_VEHICLE: + return AgentType.VEHICLE + elif agent_type == scenario_pb2.Track.ObjectType.TYPE_PEDESTRIAN: + return AgentType.PEDESTRIAN + elif agent_type == scenario_pb2.Track.ObjectType.TYPE_CYCLIST: + return AgentType.BICYCLE + elif agent_type == scenario_pb2.Track.ObjectType.OTHER: + return AgentType.UNKNOWN + return -1 + + +def is_full_boundary(lane_boundaries, num_lane_indices: int) -> bool: + """Returns True if a given boundary is connected (there are no gaps) + and every lane center index has a corresponding boundary point. + + Returns: + bool + """ + covers_all: bool = lane_boundaries[0].lane_start_index == 0 and lane_boundaries[ + 0 + ].lane_end_index == (num_lane_indices - 1) + for idx in range(1, len(lane_boundaries)): + if ( + lane_boundaries[idx].lane_start_index + != lane_boundaries[idx - 1].lane_end_index + 1 + ): + covers_all = False + break + + return covers_all + + +def _merge_interval_data(data1: str, data2: str) -> str: + if data1 == data2 == "none": + return "none" + + if data1 == "none" and data2 != "none": + return data2 + + if data1 != "none" and data2 == "none": + return data1 + + if data1 != "none" and data2 != "none": + return "both" + + +def split_lane_into_chunks( + lane: waymo_map_pb2.LaneCenter, boundaries: Dict[int, Polyline] +) -> List[Tuple[Polyline, Optional[Polyline], Optional[Polyline]]]: + boundary_intervals = IntervalTree.from_tuples( + [ + (b.lane_start_index, b.lane_end_index + 1, "left") + for b in lane.left_boundaries + ] + + [ + (b.lane_start_index, b.lane_end_index + 1, "right") + for b in lane.right_boundaries + ] + + [(0, len(lane.polyline), "none")] + ) + + boundary_intervals.split_overlaps() + + boundary_intervals.merge_equals(data_reducer=_merge_interval_data) + intervals: List[Interval] = sorted(boundary_intervals) + + if len(intervals) > 1: + merged_intervals: List[Interval] = [intervals.pop(0)] + while intervals: + last_interval: Interval = merged_intervals[-1] + curr_interval: Interval = intervals.pop(0) + + if last_interval.end != curr_interval.begin: + raise ValueError("Non-consecutive intervals in merging!") + + if last_interval.data == curr_interval.data: + # Simple merging of same-data neighbors. + merged_intervals[-1] = Interval( + last_interval.begin, + curr_interval.end, + last_interval.data, + ) + elif ( + last_interval.end - last_interval.begin == 1 + or curr_interval.end - curr_interval.begin == 1 + ): + # Trying to remove 1-length chunks by merging them with neighbors. + data_to_keep: str = ( + curr_interval.data + if curr_interval.end - curr_interval.begin + > last_interval.end - last_interval.begin + else last_interval.data + ) + + merged_intervals[-1] = Interval( + last_interval.begin, + curr_interval.end, + data_to_keep, + ) + else: + merged_intervals.append(curr_interval) + + intervals = merged_intervals + + left_boundary_tree = IntervalTree.from_tuples( + [ + (b.lane_start_index, b.lane_end_index + 1, b.boundary_feature_id) + for b in lane.left_boundaries + ] + ) + right_boundary_tree = IntervalTree.from_tuples( + [ + (b.lane_start_index, b.lane_end_index + 1, b.boundary_feature_id) + for b in lane.right_boundaries + ] + ) + lane_chunk_data: List[Tuple[Polyline, Optional[Polyline], Optional[Polyline]]] = [] + for interval in intervals: + center_chunk = Polyline( + np.array( + [ + (point.x, point.y, point.z) + for point in lane.polyline[interval.begin : interval.end] + ] + ) + ) + if interval.data == "none": + lane_chunk_data.append((center_chunk, None, None)) + elif interval.data == "left": + left_chunk = subselect_boundary( + boundaries, center_chunk, interval, left_boundary_tree + ) + lane_chunk_data.append((center_chunk, left_chunk, None)) + elif interval.data == "right": + right_chunk = subselect_boundary( + boundaries, center_chunk, interval, right_boundary_tree + ) + lane_chunk_data.append((center_chunk, None, right_chunk)) + elif interval.data == "both": + left_chunk = subselect_boundary( + boundaries, center_chunk, interval, left_boundary_tree + ) + right_chunk = subselect_boundary( + boundaries, center_chunk, interval, right_boundary_tree + ) + lane_chunk_data.append((center_chunk, left_chunk, right_chunk)) + else: + raise ValueError() + + return lane_chunk_data + + +def subselect_boundary( + boundaries: Dict[int, Polyline], + lane_center: Polyline, + chunk_interval: Interval, + boundary_tree: IntervalTree, +) -> Polyline: + relevant_boundaries: List[Interval] = sorted( + boundary_tree[chunk_interval.begin : chunk_interval.end] + ) + + if ( + len(relevant_boundaries) == 1 + and relevant_boundaries[0].begin == chunk_interval.begin + and relevant_boundaries[0].end == chunk_interval.end + ): + # Return immediately for an exact match. + return boundaries[relevant_boundaries[0].data] + + polyline_pts: List[Polyline] = [] + for boundary_interval in relevant_boundaries: + # Below we are trying to find relevant boundary regions to the current lane chunk center + # by projecting the boundary onto the lane and seeing where it stops following the center. + # After that point, the projections will cease to change + # (they will typically all be the last point of the center line). + boundary = boundaries[boundary_interval.data] + + if boundary.points.shape[0] == 1: + polyline_pts.append(boundary.points) + continue + + proj = lane_center.project_onto(boundary.points) + local_diffs = np.diff(proj, axis=0, append=proj[[-1]] - proj[[-2]]) + + nonzero_mask = (local_diffs != 0.0).any(axis=1) + nonzero_idxs = np.nonzero(nonzero_mask)[0] + marker_idx = np.nonzero(np.ediff1d(nonzero_idxs, to_begin=[2]) > 1)[0] + + # TODO(bivanovic): Only taking the first group. Adding 1 to the + # first ends because it otherwise ignores the first element of + # the repeated value group. + start = np.minimum.reduceat(nonzero_idxs, marker_idx)[0] + end = np.maximum.reduceat(nonzero_idxs, marker_idx)[0] + 1 + + # TODO(bivanovic): This may or may not end up being a problem, but + # polyline_pts[0][-1] and polyline_pts[1][0] can be exactly identical. + polyline_pts.append(boundary.points[start : end + 1]) + + return Polyline(points=np.concatenate(polyline_pts, axis=0)) + + +def translate_lane( + map_feature: waymo_map_pb2.MapFeature, + boundaries: Dict[int, Polyline], +) -> Tuple[RoadLane, Optional[Dict[int, List[bytes]]]]: + lane: waymo_map_pb2.LaneCenter = map_feature.lane + + if lane.left_boundaries or lane.right_boundaries: + # Waymo lane boundaries are... complicated. See + # https://github.com/waymo-research/waymo-open-dataset/issues/389 + # for more information. For now, we split lanes into chunks which + # have consistent lane boundaries (either both left and right, + # one of them, or none). + lane_chunks = split_lane_into_chunks(lane, boundaries) + road_lanes: List[RoadLane] = [] + new_ids: List[bytes] = [] + for idx, (lane_center, left_edge, right_edge) in enumerate(lane_chunks): + road_lane = RoadLane( + id=f"{map_feature.id}_{idx}" + if len(lane_chunks) > 1 + else str(map_feature.id), + center=lane_center, + left_edge=left_edge, + right_edge=right_edge, + ) + new_ids.append(road_lane.id) + + if idx == 0: + road_lane.prev_lanes.update([str(eid) for eid in lane.entry_lanes]) + else: + road_lane.prev_lanes.add(f"{map_feature.id}_{idx-1}") + + if idx == len(lane_chunks) - 1: + road_lane.next_lanes.update([str(eid) for eid in lane.exit_lanes]) + else: + road_lane.next_lanes.add(f"{map_feature.id}_{idx+1}") + + # We'll take care of reassigning these IDs to the chunked versions later. + for neighbor in lane.left_neighbors: + road_lane.adj_lanes_left.add(str(neighbor.feature_id)) + + for neighbor in lane.right_neighbors: + road_lane.adj_lanes_right.add(str(neighbor.feature_id)) + + road_lanes.append(road_lane) + + if len(lane_chunks) > 1: + return road_lanes, {str(map_feature.id): new_ids} + else: + return road_lanes, None + + else: + road_lane = RoadLane( + id=str(map_feature.id), + center=Polyline(np.array([(pt.x, pt.y, pt.z) for pt in lane.polyline])), + ) + + road_lane.prev_lanes.update([str(eid) for eid in lane.entry_lanes]) + road_lane.next_lanes.update([str(eid) for eid in lane.exit_lanes]) + + for neighbor in lane.left_neighbors: + road_lane.adj_lanes_left.add(str(neighbor.feature_id)) + + for neighbor in lane.right_neighbors: + road_lane.adj_lanes_right.add(str(neighbor.feature_id)) + + return [road_lane], None + + +def extract_traffic_lights( + dynamic_states: List[scenario_pb2.DynamicMapState], +) -> Dict[Tuple[str, int], TrafficLightStatus]: + ret: Dict[Tuple[str, int], TrafficLightStatus] = {} + for i, dynamic_state in enumerate(dynamic_states): + for lane_state in dynamic_state.lane_states: + ret[(str(lane_state.lane), i)] = translate_traffic_state(lane_state.state) + + return ret + + +def translate_traffic_state( + state: waymo_map_pb2.TrafficSignalLaneState.State, +) -> TrafficLightStatus: + # TODO(bivanovic): The traffic light type doesn't align between waymo and trajdata, + # since trajdata's TrafficLightStatus does not include a yellow light yet. + # For now, we set caution = red. + if state in GREEN: + return TrafficLightStatus.GREEN + if state in RED: + return TrafficLightStatus.RED + return TrafficLightStatus.UNKNOWN + + +def interpolate_array(data: List) -> np.array: + return pd.DataFrame(data).interpolate(limit_area="inside").to_numpy() diff --git a/src/trajdata/maps/map_kdtree.py b/src/trajdata/maps/map_kdtree.py index eae750a..59abdc2 100644 --- a/src/trajdata/maps/map_kdtree.py +++ b/src/trajdata/maps/map_kdtree.py @@ -24,11 +24,13 @@ class MapElementKDTree: the coordinates we want to store in the KDTree. """ - def __init__(self, vector_map: VectorMap) -> None: + def __init__(self, vector_map: VectorMap, verbose: bool = False) -> None: # Build kd-tree - self.kdtree, self.polyline_inds, self.metadata = self._build_kdtree(vector_map) + self.kdtree, self.polyline_inds, self.metadata = self._build_kdtree( + vector_map, verbose + ) - def _build_kdtree(self, vector_map: VectorMap): + def _build_kdtree(self, vector_map: VectorMap, verbose: bool = False): polylines = [] polyline_inds = [] metadata = defaultdict(list) @@ -39,6 +41,7 @@ def _build_kdtree(self, vector_map: VectorMap): desc=f"Building K-D Trees", leave=False, total=len(vector_map), + disable=not verbose, ): result = self._extract_points(map_elem) if result is not None: diff --git a/src/trajdata/utils/env_utils.py b/src/trajdata/utils/env_utils.py index 2fdcd60..4eb54d1 100644 --- a/src/trajdata/utils/env_utils.py +++ b/src/trajdata/utils/env_utils.py @@ -25,6 +25,13 @@ # with the "trajdata[nuplan]" option. pass +try: + from trajdata.dataset_specific.waymo import WaymoDataset +except ModuleNotFoundError: + # This can happen if the user did not install trajdata + # with the "trajdata[waymo]" option. + pass + def get_raw_dataset(dataset_name: str, data_dir: str) -> RawDataset: if "nusc" in dataset_name: @@ -41,6 +48,9 @@ def get_raw_dataset(dataset_name: str, data_dir: str) -> RawDataset: if "nuplan" in dataset_name: return NuplanDataset(dataset_name, data_dir, parallelizable=True, has_maps=True) + if "waymo" in dataset_name: + return WaymoDataset(dataset_name, data_dir, parallelizable=True, has_maps=True) + raise ValueError(f"Dataset with name '{dataset_name}' is not supported")