Skip to content

Commit

Permalink
[data] [streaming] [part 3/n] Rename Dataset => Datastream in interna…
Browse files Browse the repository at this point in the history
…l files (ray-project#34340)
  • Loading branch information
ericl authored Apr 14, 2023
1 parent 0100e64 commit 6d69d79
Show file tree
Hide file tree
Showing 90 changed files with 4,329 additions and 839 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@

__all__ = [
"ActorPoolStrategy",
"Dataset",
"Datastream",
"Dataset", # Backwards compatibility alias.
"DataContext",
"DatasetContext", # Backwards compatibility alias.
"DataIterator",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_concat_and_sort_transform(context: DataContext) -> Callable:

class ArrowRow(TableRow):
"""
Row of a tabular Dataset backed by a Arrow Table block.
Row of a tabular Datastream backed by a Arrow Table block.
"""

def __getitem__(self, key: str) -> Any:
Expand Down
14 changes: 7 additions & 7 deletions python/ray/data/_internal/block_batching/block_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ActorBlockPrefetcher,
)
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data._internal.stats import DatasetPipelineStats, DatasetStats
from ray.data._internal.stats import DatasetPipelineStats, DatastreamStats
from ray.data.block import Block, DataBatch
from ray.data.context import DataContext
from ray.types import ObjectRef
Expand All @@ -35,7 +35,7 @@ def nullcontext(enter_result=None):
def batch_block_refs(
block_refs: Iterator[ObjectRef[Block]],
*,
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
stats: Optional[Union[DatastreamStats, DatasetPipelineStats]] = None,
prefetch_blocks: int = 0,
clear_block_after_read: bool = False,
batch_size: Optional[int] = None,
Expand All @@ -51,8 +51,8 @@ def batch_block_refs(
This takes a block iterator and creates batch_size batches, slicing,
unioning, shuffling, prefetching, and formatting blocks as needed.
This is used by both Dataset.iter_batches()/DatasetPipeline.iter_batches()
and Dataset.map_batches()/DatasetPipeline.map_batches().
This is used by both Datastream.iter_batches()/DatasetPipeline.iter_batches()
and Datastream.map_batches()/DatasetPipeline.map_batches().
Args:
block_refs: An iterator over block object references.
Expand Down Expand Up @@ -123,7 +123,7 @@ def batch_block_refs(
def batch_blocks(
blocks: Iterator[Block],
*,
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
stats: Optional[Union[DatastreamStats, DatasetPipelineStats]] = None,
batch_size: Optional[int] = None,
batch_format: str = "default",
drop_last: bool = False,
Expand Down Expand Up @@ -173,7 +173,7 @@ def _prefetch_blocks(
prefetcher: BlockPrefetcher,
num_blocks_to_prefetch: int,
eager_free: bool = False,
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
stats: Optional[Union[DatastreamStats, DatasetPipelineStats]] = None,
) -> Iterator[ObjectRef[Block]]:
"""Given an iterable of Block Object References, returns an iterator
over these object reference while prefetching `num_block_to_prefetch`
Expand All @@ -183,7 +183,7 @@ def _prefetch_blocks(
block_ref_iter: An iterator over block object references.
num_blocks_to_prefetch: The number of blocks to prefetch ahead of the
current block during the scan.
stats: Dataset stats object used to store block wait time.
stats: Datastream stats object used to store block wait time.
"""
if num_blocks_to_prefetch == 0:
for block_ref in block_ref_iter:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/_internal/block_batching/iter_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
make_async_gen,
)
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data._internal.stats import DatasetStats
from ray.data._internal.stats import DatastreamStats
from ray.data.context import DataContext

if sys.version_info >= (3, 7):
Expand All @@ -36,7 +36,7 @@ def nullcontext(enter_result=None):
def iter_batches(
block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
*,
stats: Optional[DatasetStats] = None,
stats: Optional[DatastreamStats] = None,
clear_block_after_read: bool = False,
batch_size: Optional[int] = None,
batch_format: Optional[str] = "default",
Expand Down Expand Up @@ -83,7 +83,7 @@ def iter_batches(
Args:
block_refs: An iterator over block object references and their corresponding
metadata.
stats: DatasetStats object to record timing and other statistics.
stats: DatastreamStats object to record timing and other statistics.
clear_block_after_read: Whether to clear the block from object store
manually (i.e. without waiting for Python's automatic GC) after it
is read. Doing so will reclaim memory faster and hence reduce the
Expand Down Expand Up @@ -185,7 +185,7 @@ def _async_iter_batches(

def _format_in_threadpool(
batch_iter: Iterator[Batch],
stats: DatasetStats,
stats: DatastreamStats,
batch_format: Optional[str],
collate_fn: Optional[Callable[[DataBatch], Any]],
num_threadpool_workers: int,
Expand All @@ -194,7 +194,7 @@ def _format_in_threadpool(
Args:
logical_batch_iterator: An iterator over logical batches.
stats: DatasetStats object to record timing and other statistics.
stats: DatastreamStats object to record timing and other statistics.
batch_format: The format in which to return each batch.
Specify "default" to use the current block format (promoting
Arrow to pandas automatically), "pandas" to
Expand Down
16 changes: 8 additions & 8 deletions python/ray/data/_internal/block_batching/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
CollatedBatch,
BlockPrefetcher,
)
from ray.data._internal.stats import DatasetPipelineStats, DatasetStats
from ray.data._internal.stats import DatasetPipelineStats, DatastreamStats
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

T = TypeVar("T")
Expand Down Expand Up @@ -48,7 +48,7 @@ def _calculate_ref_hits(refs: List[ObjectRef[Any]]) -> Tuple[int, int, int]:

def resolve_block_refs(
block_ref_iter: Iterator[ObjectRef[Block]],
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
stats: Optional[Union[DatastreamStats, DatasetPipelineStats]] = None,
) -> Iterator[Block]:
"""Resolves the block references for each logical batch.
Expand Down Expand Up @@ -80,7 +80,7 @@ def resolve_block_refs(

def blocks_to_batches(
block_iter: Iterator[Block],
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
stats: Optional[Union[DatastreamStats, DatasetPipelineStats]] = None,
batch_size: Optional[int] = None,
drop_last: bool = False,
shuffle_buffer_min_size: Optional[int] = None,
Expand All @@ -95,7 +95,7 @@ def blocks_to_batches(
Args:
block_iter: An iterator over blocks.
stats: Dataset stats object used to store block batching time.
stats: Datastream stats object used to store block batching time.
batch_size: Record batch size, or None to let the system pick.
drop_last: Whether to drop the last batch if it's incomplete.
shuffle_buffer_min_size: If non-None, the data will be randomly shuffled
Expand Down Expand Up @@ -152,7 +152,7 @@ def get_iter_next_batch_s_timer():
def format_batches(
block_iter: Iterator[Batch],
batch_format: Optional[str],
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
stats: Optional[Union[DatastreamStats, DatasetPipelineStats]] = None,
) -> Iterator[Batch]:
"""Given an iterator of blocks, returns an iterator of formatted batches.
Expand All @@ -175,7 +175,7 @@ def format_batches(
def collate(
batch_iter: Iterator[Batch],
collate_fn: Optional[Callable[[DataBatch], Any]],
stats: Optional[DatasetStats] = None,
stats: Optional[DatastreamStats] = None,
) -> Iterator[CollatedBatch]:
"""Returns an iterator with the provided collate_fn applied to items of the batch
iterator.
Expand Down Expand Up @@ -278,7 +278,7 @@ def execute_computation(thread_index: int):
break


PREFETCHER_ACTOR_NAMESPACE = "ray.dataset"
PREFETCHER_ACTOR_NAMESPACE = "ray.datastream"


class WaitBlockPrefetcher(BlockPrefetcher):
Expand All @@ -300,7 +300,7 @@ def __init__(self):
@staticmethod
def _get_or_create_actor_prefetcher() -> "ActorHandle":
node_id = ray.get_runtime_context().get_node_id()
actor_name = f"dataset-block-prefetcher-{node_id}"
actor_name = f"datastream-block-prefetcher-{node_id}"
return _BlockPretcher.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(node_id, soft=False),
name=actor_name,
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ def _check_if_cleared(self) -> None:
"""Raise an error if this BlockList has been previously cleared."""
if self.is_cleared():
raise ValueError(
"This Dataset's blocks have been moved, which means that you "
"can no longer use this Dataset."
"This Datastream's blocks have been moved, which means that you "
"can no longer use this Datastream."
)

def split(self, split_size: int) -> List["BlockList"]:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _apply(

context = DataContext.get_current()

# Handle empty datasets.
# Handle empty datastreams.
if block_list.initial_num_blocks() == 0:
return block_list

Expand Down Expand Up @@ -182,10 +182,10 @@ def __eq__(self, other: Any) -> bool:

@PublicAPI
class ActorPoolStrategy(ComputeStrategy):
"""Specify the compute strategy for a Dataset transform.
"""Specify the compute strategy for a Datastream transform.
ActorPoolStrategy specifies that an autoscaling pool of actors should be used
for a given Dataset transform. This is useful for stateful setup of callable
for a given Datastream transform. This is useful for stateful setup of callable
classes.
For a fixed-sized pool of size ``n``, specify ``compute=ActorPoolStrategy(size=n)``.
Expand All @@ -209,7 +209,7 @@ def __init__(
max_size: Optional[int] = None,
max_tasks_in_flight_per_actor: Optional[int] = None,
):
"""Construct ActorPoolStrategy for a Dataset transform.
"""Construct ActorPoolStrategy for a Datastream transform.
Args:
size: Specify a fixed size actor pool of this size. It is an error to
Expand Down Expand Up @@ -276,7 +276,7 @@ def _apply(
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
) -> BlockList:
"""Note: this is not part of the Dataset public API."""
"""Note: this is not part of the Datastream public API."""
assert not DataContext.get_current().new_execution_backend, "Legacy backend off"
if fn_args is None:
fn_args = tuple()
Expand Down
26 changes: 14 additions & 12 deletions python/ray/data/_internal/dataset_iterator/dataset_iterator_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,52 @@
from ray.data.block import Block, BlockMetadata
from ray.data.context import DataContext
from ray.data.dataset_iterator import DataIterator
from ray.data._internal.stats import DatasetStats
from ray.data._internal.stats import DatastreamStats

if TYPE_CHECKING:
import pyarrow
from ray.data import Dataset
from ray.data import Datastream


class DataIteratorImpl(DataIterator):
def __init__(
self,
base_dataset: "Dataset",
base_datastream: "Datastream",
):
self._base_dataset = base_dataset
self._base_datastream = base_datastream
self._base_context = DataContext.get_current()

def __repr__(self) -> str:
return f"DataIterator({self._base_dataset})"
return f"DataIterator({self._base_datastream})"

def _to_block_iterator(
self,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]], Optional[DatasetStats], bool
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
Optional[DatastreamStats],
bool,
]:
ds = self._base_dataset
ds = self._base_datastream
block_iterator, stats, executor = ds._plan.execute_to_iterator()
ds._current_executor = executor
return block_iterator, stats, False

def stats(self) -> str:
return self._base_dataset.stats()
return self._base_datastream.stats()

def schema(self) -> Union[type, "pyarrow.lib.Schema"]:
return self._base_dataset.schema()
return self._base_datastream.schema()

def __getattr__(self, name):
if name == "_base_dataset":
if name == "_base_datastream":
raise AttributeError()

if hasattr(self._base_dataset, name) and not name.startswith("_"):
if hasattr(self._base_datastream, name) and not name.startswith("_"):
# Raise error for backwards compatibility.
# TODO: remove this method in 2.6.
raise DeprecationWarning(
"session.get_dataset_shard returns a ray.data.DataIterator "
"instead of a Dataset/DatasetPipeline as of Ray v2.3. "
"instead of a Datastream/DatasetPipeline as of Ray v2.3. "
"Use iter_torch_batches(), to_tf(), or iter_batches() to "
"iterate over one epoch. See "
"https://docs.ray.io/en/latest/data/api/dataset_iterator.html "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata, DataBatch
from ray.data.dataset_iterator import DataIterator
from ray.data._internal.stats import DatasetStats
from ray.data._internal.stats import DatastreamStats

if TYPE_CHECKING:
import pyarrow
Expand All @@ -21,7 +21,7 @@ def __init__(
def __repr__(self) -> str:
return f"DataIterator({self._base_dataset_pipeline})"

def _get_next_dataset(self) -> "DatasetPipeline":
def _get_next_datastream(self) -> "DatasetPipeline":
if self._epoch_iterator is None:
self._epoch_iterator = self._base_dataset_pipeline.iter_epochs()

Expand All @@ -31,17 +31,19 @@ def _get_next_dataset(self) -> "DatasetPipeline":
def _to_block_iterator(
self,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]], Optional[DatasetStats], bool
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
Optional[DatastreamStats],
bool,
]:
epoch_pipeline = self._get_next_dataset()
epoch_pipeline = self._get_next_datastream()

# Peek the first dataset from the pipeline to see if blocks are owned
# Peek the first datastream from the pipeline to see if blocks are owned
# by consumer. If so, the blocks are safe to be eagerly cleared after use
# because memories are not shared across different consumers. This will
# improve the memory efficiency.
if epoch_pipeline._first_dataset is not None:
if epoch_pipeline._first_datastream is not None:
blocks_owned_by_consumer = (
epoch_pipeline._first_dataset._plan.execute()._owned_by_consumer
epoch_pipeline._first_datastream._plan.execute()._owned_by_consumer
)
else:
blocks_owned_by_consumer = (
Expand Down Expand Up @@ -94,7 +96,7 @@ def __getattr__(self, name):
# TODO: remove this method in 2.6.
raise DeprecationWarning(
"session.get_dataset_shard returns a ray.data.DataIterator "
"instead of a Dataset/DatasetPipeline as of Ray v2.3. "
"instead of a Datastream/DatasetPipeline as of Ray v2.3. "
"Use iter_torch_batches(), to_tf(), or iter_batches() to "
"iterate over one epoch. See "
"https://docs.ray.io/en/latest/data/api/dataset_iterator.html "
Expand Down
Loading

0 comments on commit 6d69d79

Please sign in to comment.