Skip to content

Commit

Permalink
[SPARK-45143][PYTHON][CONNECT] Make PySpark compatible with PyArrow 1…
Browse files Browse the repository at this point in the history
…3.0.0

### What changes were proposed in this pull request?
1, in PyArrow 13.0.0, the behavior of [Table#to_pandas](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas) and [ChunkedArray#to_pandas](https://arrow.apache.org/docs/python/generated/pyarrow.ChunkedArray.html#pyarrow.ChunkedArray.to_pandas) changed, set the `coerce_temporal_nanoseconds=True`

2, there is another undocumented breaking change in data type conversion [`TimestampType#to_pandas_dtype`](https://arrow.apache.org/docs/python/generated/pyarrow.TimestampType.html#pyarrow.TimestampType.to_pandas_dtype):

12.0.1:
```
In [1]: import pyarrow as pa

In [2]: pa.timestamp("us", tz=None).to_pandas_dtype()
Out[2]: dtype('<M8[ns]')

In [3]: pa.timestamp("ns", tz=None).to_pandas_dtype()
Out[3]: dtype('<M8[ns]')

In [4]: pa.timestamp("us", tz="UTC").to_pandas_dtype()
Out[4]: datetime64[ns, UTC]

In [5]: pa.timestamp("ns", tz="UTC").to_pandas_dtype()
Out[5]: datetime64[ns, UTC]
```

13.0.0:
```
In [1]: import pyarrow as pa

In [2]: pa.timestamp("us", tz=None).to_pandas_dtype()
Out[2]: dtype('<M8[us]')

In [3]: pa.timestamp("ns", tz=None).to_pandas_dtype()
Out[3]: dtype('<M8[ns]')

In [4]: pa.timestamp("us", tz="UTC").to_pandas_dtype()
Out[4]: datetime64[us, UTC]

In [5]: pa.timestamp("ns", tz="UTC").to_pandas_dtype()
Out[5]: datetime64[ns, UTC]
```

### Why are the changes needed?
Make PySpark compatible with PyArrow 13.0.0

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
CI

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes apache#42920 from zhengruifeng/py_pyarrow_13.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
zhengruifeng authored and dongjoon-hyun committed Sep 15, 2023
1 parent 5d4ca79 commit c1c710e
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ jobs:
- name: Install Python packages (Python 3.8)
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect')
run: |
python3.8 -m pip install 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas scipy unittest-xml-reporting 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3'
python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3'
python3.8 -m pip list
# Run the tests.
- name: Run tests
Expand Down Expand Up @@ -728,7 +728,7 @@ jobs:
# See also https://issues.apache.org/jira/browse/SPARK-38279.
python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0'
python3.9 -m pip install ipython_genutils # See SPARK-38517
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas 'plotly>=4.8'
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8'
python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
apt-get update -y
apt-get install -y ruby ruby-dev
Expand Down
2 changes: 1 addition & 1 deletion dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht
ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"

RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib
RUN python3.9 -m pip install numpy 'pyarrow==12.0.1' 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'

# Add Python deps for Spark Connect.
RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' 'googleapis-common-protos==1.56.4'
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/pandas/typedef/typehints.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ def spark_type_to_pandas_dtype(
),
):
return np.dtype("object")
elif isinstance(spark_type, types.TimestampType):
elif isinstance(spark_type, types.DayTimeIntervalType):
return np.dtype("timedelta64[ns]")
elif isinstance(spark_type, (types.TimestampType, types.TimestampNTZType)):
return np.dtype("datetime64[ns]")
else:
return np.dtype(to_arrow_type(spark_type).to_pandas_dtype())
Expand Down
28 changes: 20 additions & 8 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import urllib.parse
import uuid
import sys
from distutils.version import LooseVersion
from types import TracebackType
from typing import (
Iterable,
Expand Down Expand Up @@ -880,19 +881,30 @@ def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame":

# Rename columns to avoid duplicated column names.
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])

pandas_options = {}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options = {
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
pdf = renamed_table.to_pandas(**pandas_options)
else:
pdf = renamed_table.to_pandas()
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)
if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)
pdf = renamed_table.to_pandas(**pandas_options)
pdf.columns = schema.names

if len(pdf.columns) > 0:
Expand Down
16 changes: 14 additions & 2 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TYPE_CHECKING,
)
from warnings import warn
from distutils.version import LooseVersion

from pyspark.errors.exceptions.captured import unwrap_spark_exception
from pyspark.rdd import _load_from_socket
Expand Down Expand Up @@ -125,19 +126,30 @@ def toPandas(self) -> "PandasDataFrameLike":
# of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled.
if use_arrow:
try:
import pyarrow
import pyarrow as pa

self_destruct = jconf.arrowPySparkSelfDestructEnabled()
batches = self._collect_as_arrow(split_batches=self_destruct)
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
table = pa.Table.from_batches(batches)
# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {"date_as_object": True}

if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)

if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
Expand Down
17 changes: 16 additions & 1 deletion python/pyspark/sql/pandas/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,22 @@ def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict", ndarray_as_list
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
# datetime64[ns] type handling.
# Cast dates to objects instead of datetime64[ns] dtype to avoid overflow.
s = arrow_column.to_pandas(date_as_object=True)
pandas_options = {"date_as_object": True}

import pyarrow as pa
from distutils.version import LooseVersion

if LooseVersion(pa.__version__) >= LooseVersion("13.0.0"):
# A legacy option to coerce date32, date64, duration, and timestamp
# time units to nanoseconds when converting to pandas.
# This option can only be added since 13.0.0.
pandas_options.update(
{
"coerce_temporal_nanoseconds": True,
}
)

s = arrow_column.to_pandas(**pandas_options)

# TODO(SPARK-43579): cache the converter for reuse
converter = _create_converter_to_pandas(
Expand Down

0 comments on commit c1c710e

Please sign in to comment.