Skip to content

Commit

Permalink
[Datasets] [Operator Fusion - 2/N] Data layer performance/bug fixes a…
Browse files Browse the repository at this point in the history
…nd tweaks. (ray-project#32744)

This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes.
  • Loading branch information
clarkzinzow authored Feb 23, 2023
1 parent 9f2145b commit b081dd2
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 43 deletions.
79 changes: 48 additions & 31 deletions python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ def _cast_ndarray_columns_to_tensor_extension(df: "pd.DataFrame") -> "pd.DataFra
Cast all NumPy ndarray columns in df to our tensor extension type, TensorArray.
"""
pd = _lazy_import_pandas()
try:
SettingWithCopyWarning = pd.core.common.SettingWithCopyWarning
except AttributeError:
# SettingWithCopyWarning was moved to pd.errors in Pandas 1.5.0.
SettingWithCopyWarning = pd.errors.SettingWithCopyWarning

from ray.air.util.tensor_extensions.pandas import (
TensorArray,
column_needs_tensor_extension,
Expand All @@ -246,42 +252,53 @@ def _cast_ndarray_columns_to_tensor_extension(df: "pd.DataFrame") -> "pd.DataFra
# TODO(Clark): Once Pandas supports registering extension types for type
# inference on construction, implement as much for NumPy ndarrays and remove
# this. See https://github.com/pandas-dev/pandas/issues/41848
with pd.option_context("chained_assignment", None):
for col_name, col in df.items():
if column_needs_tensor_extension(col):
try:
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
df.loc[:, col_name] = TensorArray(col)
except Exception as e:
raise ValueError(
f"Tried to cast column {col_name} to the TensorArray tensor "
"extension type but the conversion failed. To disable "
"automatic casting to this tensor extension, set "
"ctx = DatasetContext.get_current(); "
"ctx.enable_tensor_extension_casting = False."
) from e
# TODO(Clark): Optimize this with propagated DataFrame metadata containing a list of
# column names containing tensor columns, to make this an O(# of tensor columns)
# check rather than the current O(# of columns) check.
for col_name, col in df.items():
if column_needs_tensor_extension(col):
try:
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
warnings.simplefilter("ignore", category=SettingWithCopyWarning)
df.loc[:, col_name] = TensorArray(col)
except Exception as e:
raise ValueError(
f"Tried to cast column {col_name} to the TensorArray tensor "
"extension type but the conversion failed. To disable "
"automatic casting to this tensor extension, set "
"ctx = DatasetContext.get_current(); "
"ctx.enable_tensor_extension_casting = False."
) from e
return df


def _cast_tensor_columns_to_ndarrays(df: "pd.DataFrame") -> "pd.DataFrame":
"""Cast all tensor extension columns in df to NumPy ndarrays."""
pd = _lazy_import_pandas()
try:
SettingWithCopyWarning = pd.core.common.SettingWithCopyWarning
except AttributeError:
# SettingWithCopyWarning was moved to pd.errors in Pandas 1.5.0.
SettingWithCopyWarning = pd.errors.SettingWithCopyWarning
from ray.air.util.tensor_extensions.pandas import TensorDtype

with pd.option_context("chained_assignment", None):
# Try to convert any tensor extension columns to ndarray columns.
for col_name, col in df.items():
if isinstance(col.dtype, TensorDtype):
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
df.loc[:, col_name] = pd.Series(list(col.to_numpy()))
return df
# Try to convert any tensor extension columns to ndarray columns.
# TODO(Clark): Optimize this with propagated DataFrame metadata containing a list of
# column names containing tensor columns, to make this an O(# of tensor columns)
# check rather than the current O(# of columns) check.
for col_name, col in df.items():
if isinstance(col.dtype, TensorDtype):
# Suppress Pandas warnings:
# https://github.com/ray-project/ray/issues/29270
# We actually want in-place operations so we surpress this warning.
# https://stackoverflow.com/a/74193599
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
warnings.simplefilter("ignore", category=SettingWithCopyWarning)
df.loc[:, col_name] = pd.Series(list(col.to_numpy()))
return df
4 changes: 4 additions & 0 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ def _table_from_pydict(columns: Dict[str, List[Any]]) -> Block:
def _concat_tables(tables: List[Block]) -> Block:
return transform_pyarrow.concat(tables)

@staticmethod
def _concat_would_copy() -> bool:
return False

@staticmethod
def _empty_table() -> "pyarrow.Table":
return pyarrow.Table.from_pydict({})
Expand Down
9 changes: 4 additions & 5 deletions python/ray/data/_internal/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ def next_batch(self) -> Block:
A batch represented as a Block.
"""
assert self.has_batch() or (self._done_adding and self.has_any())
needs_copy = self._ensure_copy
# If no batch size, short-circuit.
if self._batch_size is None:
assert len(self._buffer) == 1
block = self._buffer[0]
if self._ensure_copy:
if needs_copy:
# Copy block if needing to ensure fresh batch copy.
block = BlockAccessor.for_block(block)
block = block.slice(0, block.num_rows(), copy=True)
Expand Down Expand Up @@ -139,13 +140,11 @@ def next_batch(self) -> Block:
# blocks consumed on the next batch extraction.
self._buffer = leftover
self._buffer_size -= self._batch_size
needs_copy = needs_copy and not output.will_build_yield_copy()
batch = output.build()
if self._ensure_copy:
if needs_copy:
# Need to ensure that the batch is a fresh copy.
batch = BlockAccessor.for_block(batch)
# TOOD(Clark): This copy will often be unnecessary, e.g. for pandas
# DataFrame batches that have required concatenation to construct, which
# always requires a copy. We should elide this copy in those cases.
batch = batch.slice(0, batch.num_rows(), copy=True)
return batch

Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/_internal/block_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def add_block(self, block: Block) -> None:
"""Append an entire block to the block being built."""
raise NotImplementedError

def will_build_yield_copy(self) -> bool:
"""Whether building this block will yield a new block copy."""
raise NotImplementedError

def build(self) -> Block:
"""Build the block."""
raise NotImplementedError
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/_internal/delegating_block_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def add_block(self, block: Block):
self._builder = accessor.builder()
self._builder.add_block(block)

def will_build_yield_copy(self) -> bool:
if self._builder is None:
return True
return self._builder.will_build_yield_copy()

def build(self) -> Block:
if self._builder is None:
if self._empty_block is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def start(self, options: ExecutionOptions):
def _start_actor(self):
"""Start a new actor and add it to the actor pool as a pending actor."""
assert self._cls is not None
actor = self._cls.remote()
ctx = DatasetContext.get_current()
actor = self._cls.remote(ctx)
self._actor_pool.add_pending_actor(actor, actor.get_location.remote())

def _add_bundled_input(self, bundle: RefBundle):
Expand Down Expand Up @@ -279,6 +280,9 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
class _MapWorker:
"""An actor worker for MapOperator."""

def __init__(self, ctx: DatasetContext):
DatasetContext._set_current(ctx)

def get_location(self) -> NodeIdStr:
return ray.get_runtime_context().get_node_id()

Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,18 @@ def _concat_tables(tables: List["pandas.DataFrame"]) -> "pandas.DataFrame":

if len(tables) > 1:
df = pandas.concat(tables, ignore_index=True)
df.reset_index(drop=True, inplace=True)
else:
df = tables[0]
df.reset_index(drop=True, inplace=True)
ctx = DatasetContext.get_current()
if ctx.enable_tensor_extension_casting:
df = _cast_ndarray_columns_to_tensor_extension(df)
return df

@staticmethod
def _concat_would_copy() -> bool:
return True

@staticmethod
def _empty_table() -> "pandas.DataFrame":
pandas = lazy_import_pandas()
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/_internal/simple_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ def add_block(self, block: List[T]) -> None:
f"{block}"
)
self._items.extend(block)
for item in block:
self._size_estimator.add(item)
self._size_estimator.add_block(block)

def num_rows(self) -> int:
return len(self._items)

def will_build_yield_copy(self) -> bool:
return True

def build(self) -> Block:
return list(self._items)

Expand Down
16 changes: 15 additions & 1 deletion python/ray/data/_internal/size_estimator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, List

import ray
from ray import cloudpickle
Expand Down Expand Up @@ -27,6 +27,20 @@ def add(self, item: Any) -> None:
elif self._count % 100 == 0:
self._running_mean.add(self._real_size(item), weight=100)

def add_block(self, block: List[Any]) -> None:
if self._count < 10:
for i in range(min(10 - self._count, len(block))):
self._running_mean.add(self._real_size(block[i]), weight=1)
if self._count < 100:
for i in range(
10 - (self._count % 10), min(100 - self._count, len(block)), 10
):
self._running_mean.add(self._real_size(block[i]), weight=10)
if (len(block) + (self._count % 100)) // 100 > 1:
for i in range(100 - (self._count % 100), len(block), 100):
self._running_mean.add(self._real_size(block[i]), weight=100)
self._count += len(block)

def size_bytes(self) -> int:
return int(self._running_mean.mean * self._count)

Expand Down
22 changes: 21 additions & 1 deletion python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ def __init__(self, block_type):
self._column_names = None
# The set of compacted tables we have built so far.
self._tables: List[Any] = []
# Cursor into tables indicating up to which table we've accumulated table sizes.
# This is used to defer table size calculation, which can be expensive for e.g.
# Pandas DataFrames.
# This cursor points to the first table for which we haven't accumulated a table
# size.
self._tables_size_cursor = 0
# Accumulated table sizes, up to the table in _tables pointed to by
# _tables_size_cursor.
self._tables_size_bytes = 0
# Size estimator for un-compacted table values.
self._uncompacted_size = SizeEstimator()
Expand Down Expand Up @@ -76,7 +84,6 @@ def add_block(self, block: Any) -> None:
)
accessor = BlockAccessor.for_block(block)
self._tables.append(block)
self._tables_size_bytes += accessor.size_bytes()
self._num_rows += accessor.num_rows()

@staticmethod
Expand All @@ -91,6 +98,16 @@ def _concat_tables(tables: List[Block]) -> Block:
def _empty_table() -> Any:
raise NotImplementedError

@staticmethod
def _concat_would_copy() -> bool:
raise NotImplementedError

def will_build_yield_copy(self) -> bool:
if self._columns:
# Building a table from a dict of list columns always creates a copy.
return True
return self._concat_would_copy() and len(self._tables) > 1

def build(self) -> Block:
if self._columns:
tables = [self._table_from_pydict(self._columns)]
Expand All @@ -108,6 +125,9 @@ def num_rows(self) -> int:
def get_estimated_memory_usage(self) -> int:
if self._num_rows == 0:
return 0
for table in self._tables[self._tables_size_cursor :]:
self._tables_size_bytes += BlockAccessor.for_block(table).size_bytes()
self._tables_size_cursor = len(self._tables)
return self._tables_size_bytes + self._uncompacted_size.size_bytes()

def _compact_if_needed(self) -> None:
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3990,7 +3990,9 @@ def lazy(self) -> "Dataset[T]":
``.iter_batches()``, ``.to_torch()``, ``.to_tf()``, etc.) or execution is
manually triggered via ``.fully_executed()``.
"""
ds = Dataset(self._plan, self._epoch, lazy=True)
ds = Dataset(
self._plan, self._epoch, lazy=True, logical_plan=self._logical_plan
)
ds._set_uuid(self._get_uuid())
return ds

Expand Down

0 comments on commit b081dd2

Please sign in to comment.