Skip to content

Commit

Permalink
[SPARK-45166][PYTHON] Clean up unused code paths for pyarrow<4
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Clean up unused code paths for pyarrow<4

### Why are the changes needed?
the minimum version had been upgraded to 4.0.0 in https://issues.apache.org/jira/browse/SPARK-44183

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
updated CI

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#42928 from zhengruifeng/py_pa_version.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
zhengruifeng authored and HyukjinKwon committed Sep 15, 2023
1 parent c490d8c commit 74d3d87
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 136 deletions.
9 changes: 1 addition & 8 deletions python/pyspark/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import os
import sys
import warnings
from distutils.version import LooseVersion
from typing import Any

from pyspark.pandas.missing.general_functions import MissingPandasLikeGeneralFunctions
Expand All @@ -40,13 +39,7 @@
else:
raise


import pyarrow

if (
LooseVersion(pyarrow.__version__) >= LooseVersion("2.0.0")
and "PYARROW_IGNORE_TIMEZONE" not in os.environ
):
if "PYARROW_IGNORE_TIMEZONE" not in os.environ:
warnings.warn(
"'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to "
"set this environment variable to '1' in both driver and executor sides if you use "
Expand Down
20 changes: 0 additions & 20 deletions python/pyspark/sql/pandas/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@

def to_arrow_type(dt: DataType) -> "pa.DataType":
"""Convert Spark data type to pyarrow type"""
from distutils.version import LooseVersion
import pyarrow as pa

if type(dt) == BooleanType:
Expand Down Expand Up @@ -93,21 +92,9 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
elif type(dt) == DayTimeIntervalType:
arrow_type = pa.duration("us")
elif type(dt) == ArrayType:
if type(dt.elementType) == StructType and LooseVersion(pa.__version__) < LooseVersion(
"2.0.0"
):
raise PySparkTypeError(
error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
message_parameters={"data_type": "Array of StructType"},
)
field = pa.field("element", to_arrow_type(dt.elementType), nullable=dt.containsNull)
arrow_type = pa.list_(field)
elif type(dt) == MapType:
if LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
raise PySparkTypeError(
error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
message_parameters={"data_type": "MapType"},
)
key_field = pa.field("key", to_arrow_type(dt.keyType), nullable=False)
value_field = pa.field("value", to_arrow_type(dt.valueType), nullable=dt.valueContainsNull)
arrow_type = pa.map_(key_field, value_field)
Expand Down Expand Up @@ -142,8 +129,6 @@ def to_arrow_schema(schema: StructType) -> "pa.Schema":

def from_arrow_type(at: "pa.DataType", prefer_timestamp_ntz: bool = False) -> DataType:
"""Convert pyarrow type to Spark data type."""
from distutils.version import LooseVersion
import pyarrow as pa
import pyarrow.types as types

spark_type: DataType
Expand Down Expand Up @@ -182,11 +167,6 @@ def from_arrow_type(at: "pa.DataType", prefer_timestamp_ntz: bool = False) -> Da
elif types.is_list(at):
spark_type = ArrayType(from_arrow_type(at.value_type, prefer_timestamp_ntz))
elif types.is_map(at):
if LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
raise PySparkTypeError(
error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
message_parameters={"data_type": "MapType"},
)
spark_type = MapType(
from_arrow_type(at.key_type, prefer_timestamp_ntz),
from_arrow_type(at.item_type, prefer_timestamp_ntz),
Expand Down
12 changes: 3 additions & 9 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import unittest
from datetime import date, datetime
from decimal import Decimal
from distutils.version import LooseVersion
from typing import cast

from pyspark import TaskContext
Expand Down Expand Up @@ -599,14 +598,9 @@ def test_vectorized_udf_map_type(self):
schema = StructType([StructField("map", MapType(StringType(), LongType()))])
df = self.spark.createDataFrame(data, schema=schema)
for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
if LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
with QuietTest(self.sc):
with self.assertRaisesRegex(Exception, "MapType.*not supported"):
pandas_udf(lambda x: x, MapType(StringType(), LongType()), udf_type)
else:
map_f = pandas_udf(lambda x: x, MapType(StringType(), LongType()), udf_type)
result = df.select(map_f(col("map")))
self.assertEqual(df.collect(), result.collect())
map_f = pandas_udf(lambda x: x, MapType(StringType(), LongType()), udf_type)
result = df.select(map_f(col("map")))
self.assertEqual(df.collect(), result.collect())

def test_vectorized_udf_complex(self):
df = self.spark.range(10).select(
Expand Down
108 changes: 9 additions & 99 deletions python/pyspark/sql/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import threading
import time
import unittest
import warnings
from distutils.version import LooseVersion
from typing import cast
from collections import namedtuple

Expand Down Expand Up @@ -195,46 +193,6 @@ def create_np_arrs(self):
+ [np.array([[0.1, 0.1, 0.1], [0.2, 0.2, 0.2]]).astype(t) for t in float_dtypes]
)

@unittest.skipIf(
not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
"will not fallback with pyarrow>=2.0",
)
def test_toPandas_fallback_enabled(self):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}):
schema = StructType([StructField("a", ArrayType(StructType()), True)])
df = self.spark.createDataFrame([([Row()],)], schema=schema)
with QuietTest(self.sc):
with self.warnings_lock:
with warnings.catch_warnings(record=True) as warns:
# we want the warnings to appear even if this test is run from a subclass
warnings.simplefilter("always")
pdf = df.toPandas()
# Catch and check the last UserWarning.
user_warns = [
warn.message for warn in warns if isinstance(warn.message, UserWarning)
]
self.assertTrue(len(user_warns) > 0)
self.assertTrue("Attempting non-optimization" in str(user_warns[-1]))
assert_frame_equal(pdf, pd.DataFrame({"a": [[Row()]]}))

@unittest.skipIf(
not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
"will not fallback with pyarrow>=2.0",
)
def test_toPandas_fallback_disabled(self):
schema = StructType([StructField("a", ArrayType(StructType()), True)])
df = self.spark.createDataFrame([(None,)], schema=schema)
with QuietTest(self.sc):
with self.warnings_lock:
with self.assertRaises(PySparkTypeError) as pe:
df.toPandas()

self.check_error(
exception=pe.exception,
error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
message_parameters={"data_type": "Array of StructType"},
)

def test_toPandas_empty_df_arrow_enabled(self):
for arrow_enabled in [True, False]:
with self.subTest(arrow_enabled=arrow_enabled):
Expand Down Expand Up @@ -654,17 +612,13 @@ def check_createDataFrame_with_map_type(self, arrow_enabled):
):
with self.subTest(schema=schema):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
if arrow_enabled and LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
with self.assertRaisesRegex(Exception, "MapType.*only.*pyarrow 2.0.0"):
self.spark.createDataFrame(pdf, schema=schema).collect()
else:
df = self.spark.createDataFrame(pdf, schema=schema)
df = self.spark.createDataFrame(pdf, schema=schema)

result = df.collect()
result = df.collect()

for row in result:
i, m = row
self.assertEqual(m, map_data[i])
for row in result:
i, m = row
self.assertEqual(m, map_data[i])

def test_createDataFrame_with_struct_type(self):
for arrow_enabled in [True, False]:
Expand Down Expand Up @@ -732,12 +686,8 @@ def check_toPandas_with_map_type(self, arrow_enabled):
df = self.spark.createDataFrame(origin, schema=schema)

with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
if arrow_enabled and LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
with self.assertRaisesRegex(Exception, "MapType.*only.*pyarrow 2.0.0"):
df.toPandas()
else:
pdf = df.toPandas()
assert_frame_equal(origin, pdf)
pdf = df.toPandas()
assert_frame_equal(origin, pdf)

def test_toPandas_with_map_type_nulls(self):
with QuietTest(self.sc):
Expand All @@ -758,12 +708,8 @@ def check_toPandas_with_map_type_nulls(self, arrow_enabled):
df = self.spark.createDataFrame(origin, schema=schema)

with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
if arrow_enabled and LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
with self.assertRaisesRegex(Exception, "MapType.*only.*pyarrow 2.0.0"):
df.toPandas()
else:
pdf = df.toPandas()
assert_frame_equal(origin, pdf)
pdf = df.toPandas()
assert_frame_equal(origin, pdf)

def test_createDataFrame_with_int_col_names(self):
for arrow_enabled in [True, False]:
Expand All @@ -779,42 +725,6 @@ def check_createDataFrame_with_int_col_names(self, arrow_enabled):
pdf_col_names = [str(c) for c in pdf.columns]
self.assertEqual(pdf_col_names, df.columns)

@unittest.skipIf(
not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
"will not fallback with pyarrow>=2.0",
)
def test_createDataFrame_fallback_enabled(self):
with QuietTest(self.sc):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}):
with warnings.catch_warnings(record=True) as warns:
# we want the warnings to appear even if this test is run from a subclass
warnings.simplefilter("always")
df = self.spark.createDataFrame(
pd.DataFrame({"a": [[Row()]]}), "a: array<struct<>>"
)
# Catch and check the last UserWarning.
user_warns = [
warn.message for warn in warns if isinstance(warn.message, UserWarning)
]
self.assertTrue(len(user_warns) > 0)
self.assertTrue("Attempting non-optimization" in str(user_warns[-1]))
self.assertEqual(df.collect(), [Row(a=[Row()])])

@unittest.skipIf(
not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
"will not fallback with pyarrow>=2.0",
)
def test_createDataFrame_fallback_disabled(self):
with QuietTest(self.sc):
with self.assertRaises(PySparkTypeError) as pe:
self.spark.createDataFrame(pd.DataFrame({"a": [[Row()]]}), "a: array<struct<>>")

self.check_error(
exception=pe.exception,
error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
message_parameters={"data_type": "Array of StructType"},
)

# Regression test for SPARK-23314
def test_timestamp_dst(self):
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
Expand Down

0 comments on commit 74d3d87

Please sign in to comment.