Skip to content

Commit

Permalink
[SPARK-45130][CONNECT][ML][PYTHON] Avoid Spark connect ML model to ch…
Browse files Browse the repository at this point in the history
…ange input pandas dataframe

### What changes were proposed in this pull request?

Currently, to avoid data copy, Spark connect ML model directly changes input pandas dataframe for appending prediction columns. But we can use `pandas_df.copy(deep=False)` to shallow copy it and then append prediction columns in copied dataframe. This is easier for user to use it.

### Why are the changes needed?

This makes `pyspark.ml.connect` model `transform` method has more similar behavior with `pyspark.ml` model, i.e., the input dataframe is intact after `transform` is called. Otherwise user might be surprise at the new behavior and have to change more code to migrate their workload to `pyspark.ml.connect`

### Does this PR introduce _any_ user-facing change?

Yes.
Previous behavior:
In `pyspark.ml.connect`, `model.transform` will append new columns into input pandas dataframe, and return input dataframe object.

Changed behavior:
In `pyspark.ml.connect`, `model.transform` will shallow copy input pandas dataframe and append new columns into shallow copied pandas dataframe, then return copied pandas dataframe.

### How was this patch tested?

Unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#42887 from WeichenXu123/spark-ml-connect-model-avoid-change-input-dataframe.

Authored-by: Weichen Xu <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
WeichenXu123 authored and zhengruifeng committed Sep 18, 2023

Verified

This commit was signed with the committer’s verified signature.
zhengruifeng Ruifeng Zheng
1 parent d5ff04d commit 99a979d
Showing 5 changed files with 25 additions and 8 deletions.
8 changes: 4 additions & 4 deletions python/pyspark/ml/connect/base.py
Original file line number Diff line number Diff line change
@@ -151,11 +151,11 @@ def transform(
) -> Union[DataFrame, pd.DataFrame]:
"""
Transforms the input dataset.
The dataset can be either pandas dataframe or spark dataframe,
The dataset can be either pandas dataframe or spark dataframe
if it is a spark DataFrame, the result of transformation is a new spark DataFrame
that contains all existing columns and output columns with names.
if it is a pandas DataFrame, the input pandas dataframe is appended with output
columns in place.
that contains all existing columns and output columns with names,
If it is a pandas DataFrame, the result of transformation is a shallow copy
of the input pandas dataframe with output columns with names.
Note: Transformers does not allow output column having the same name with
existing columns.
1 change: 1 addition & 0 deletions python/pyspark/ml/connect/util.py
Original file line number Diff line number Diff line change
@@ -147,6 +147,7 @@ def transform_dataframe_column(
output_col_name, spark_udf_return_type = output_cols[0]

if isinstance(dataframe, pd.DataFrame):
dataframe = dataframe.copy(deep=False)
result_data = transform_fn(*[dataframe[col_name] for col_name in input_cols])
if isinstance(result_data, pd.Series):
assert len(output_cols) == 1
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
LogisticRegression as LORV2,
LogisticRegressionModel as LORV2Model,
)
import pandas as pd


class ClassificationTestsMixin:
@@ -81,7 +82,11 @@ def test_binary_classes_logistic_regression(self):

result = model.transform(eval_df1).toPandas()
self._check_result(result, expected_predictions, expected_probabilities)
local_transform_result = model.transform(eval_df1.toPandas())
pandas_eval_df1 = eval_df1.toPandas()
pandas_eval_df1_copy = pandas_eval_df1.copy()
local_transform_result = model.transform(pandas_eval_df1)
# assert that `transform` doesn't mutate the input dataframe.
pd.testing.assert_frame_equal(pandas_eval_df1, pandas_eval_df1_copy)
self._check_result(local_transform_result, expected_predictions, expected_probabilities)

model.set(model.probabilityCol, "")
10 changes: 8 additions & 2 deletions python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
Original file line number Diff line number Diff line change
@@ -25,13 +25,15 @@
from pyspark.sql import SparkSession
from pyspark.testing.connectutils import should_test_connect, connect_requirement_message


if should_test_connect:
from pyspark.ml.connect.feature import (
MaxAbsScaler,
MaxAbsScalerModel,
StandardScaler,
StandardScalerModel,
)
import pandas as pd


class FeatureTestsMixin:
@@ -57,8 +59,10 @@ def test_max_abs_scaler(self):

local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_df1_copy = local_df1.copy()
local_transform_result = local_fit_model.transform(local_df1)
assert id(local_transform_result) == id(local_df1)
# assert that `transform` doesn't mutate the input dataframe.
pd.testing.assert_frame_equal(local_df1, local_df1_copy)
assert list(local_transform_result.columns) == ["features", "scaled_features"]

np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result)
@@ -110,8 +114,10 @@ def test_standard_scaler(self):

local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_df1_copy = local_df1.copy()
local_transform_result = local_fit_model.transform(local_df1)
assert id(local_transform_result) == id(local_df1)
# assert that `transform` doesn't mutate the input dataframe.
pd.testing.assert_frame_equal(local_df1, local_df1_copy)
assert list(local_transform_result.columns) == ["features", "scaled_features"]

np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result)
7 changes: 6 additions & 1 deletion python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
from pyspark.ml.connect.feature import StandardScaler
from pyspark.ml.connect.classification import LogisticRegression as LORV2
from pyspark.ml.connect.pipeline import Pipeline
import pandas as pd


class PipelineTestsMixin:
@@ -81,7 +82,11 @@ def test_pipeline(self):
model2 = pipeline2.fit(train_dataset)
result2 = model2.transform(eval_dataset).toPandas()
self._check_result(result2, expected_predictions, expected_probabilities)
local_transform_result2 = model2.transform(eval_dataset.toPandas())
local_eval_dataset = eval_dataset.toPandas()
local_eval_dataset_copy = local_eval_dataset.copy()
local_transform_result2 = model2.transform(local_eval_dataset)
# assert that `transform` doesn't mutate the input dataframe.
pd.testing.assert_frame_equal(local_eval_dataset, local_eval_dataset_copy)
self._check_result(local_transform_result2, expected_predictions, expected_probabilities)

with tempfile.TemporaryDirectory() as tmp_dir:

0 comments on commit 99a979d

Please sign in to comment.