Skip to content

Commit

Permalink
[SPARK-41533][CONNECT] Proper Error Handling for Spark Connect Server…
Browse files Browse the repository at this point in the history
… / Client

### What changes were proposed in this pull request?
This PR improves the error handling on the Spark Connect server and client side. First, this patch moves the error handling logic on the server into a common error handler partial function that differentiates between the internal Spark errors and other runtime errors.

For custom Spark exceptions, the actual internal error is wrapped into a Google RPC Status and sent as trailing metadata to the client.

On the client side, similarly, the error handling is moved into a common function. All GRPC errors are wrapped into custom exceptions to avoid presenting the user with confusing GRPC errors. If available the attached RPC status is extracted and added to the exception.

Lastly, this patch adds basic logging functionality that can be enabled using the environment variable `SPARK_CONNECT_LOG_LEVEL` and can be set to `info`, `warn`, `error`, and `debug`.

### Why are the changes needed?
Usability

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes apache#39212 from grundprinzip/SPARK-41533.

Authored-by: Martin Grund <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
grundprinzip authored and HyukjinKwon committed Dec 29, 2022
1 parent a28f08d commit 10b4484
Show file tree
Hide file tree
Showing 17 changed files with 364 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ jobs:
# See also https://issues.apache.org/jira/browse/SPARK-38279.
python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0'
python3.9 -m pip install ipython_genutils # See SPARK-38517
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' 'grpcio==1.48.1' 'protobuf==3.19.5' 'mypy-protobuf==3.3.0'
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' 'grpcio==1.48.1' 'protobuf==3.19.5' 'mypy-protobuf==3.3.0' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0'
python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
apt-get update -y
apt-get install -y ruby ruby-dev
Expand Down
2 changes: 1 addition & 1 deletion connector/connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ To use the release version of Spark Connect:

### Generate proto generated files for the Python client
1. Install `buf version 1.11.0`: https://docs.buf.build/installation
2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0`
2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0 googleapis-common-protos==1.56.4 grpcio-status==1.48.1`
3. Run `./connector/connect/dev/generate_protos.sh`
4. Optional Check `./dev/check-codegen-python.py`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ package org.apache.spark.sql.connect.service
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.common.base.Ticker
import com.google.common.cache.CacheBuilder
import com.google.protobuf.{Any => ProtoAny}
import com.google.rpc.{Code => RPCCode, ErrorInfo, Status => RPCStatus}
import io.grpc.{Server, Status}
import io.grpc.netty.NettyServerBuilder
import io.grpc.protobuf.StatusProto
import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.stub.StreamObserver

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkEnv, SparkThrowable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AnalyzePlanRequest, AnalyzePlanResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT
import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner}
import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExplainMode, ExtendedMode, FormattedMode, SimpleMode}
Expand All @@ -49,6 +53,71 @@ class SparkConnectService(debug: Boolean)
extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
with Logging {

private def buildStatusFromThrowable[A <: Throwable with SparkThrowable](st: A): RPCStatus = {
val t = Option(st.getCause).getOrElse(st)
RPCStatus
.newBuilder()
.setCode(RPCCode.INTERNAL_VALUE)
.addDetails(
ProtoAny.pack(
ErrorInfo
.newBuilder()
.setReason(t.getClass.getName)
.setDomain("org.apache.spark")
.build()))
.setMessage(t.getLocalizedMessage)
.build()
}

/**
* Common exception handling function for the Analysis and Execution methods. Closes the stream
* after the error has been sent.
*
* @param opType
* String value indicating the operation type (analysis, execution)
* @param observer
* The GRPC response observer.
* @tparam V
* @return
*/
private def handleError[V](
opType: String,
observer: StreamObserver[V]): PartialFunction[Throwable, Unit] = {
case ae: AnalysisException =>
logError(s"Error during: $opType", ae)
val status = RPCStatus
.newBuilder()
.setCode(RPCCode.INTERNAL_VALUE)
.addDetails(
ProtoAny.pack(
ErrorInfo
.newBuilder()
.setReason(ae.getClass.getName)
.setDomain("org.apache.spark")
.putMetadata("message", ae.getSimpleMessage)
.putMetadata("plan", Option(ae.plan).flatten.map(p => s"$p").getOrElse(""))
.build()))
.setMessage(ae.getLocalizedMessage)
.build()
observer.onError(StatusProto.toStatusRuntimeException(status))
case st: SparkThrowable =>
logError(s"Error during: $opType", st)
val status = buildStatusFromThrowable(st)
observer.onError(StatusProto.toStatusRuntimeException(status))
case NonFatal(nf) =>
logError(s"Error during: $opType", nf)
val status = RPCStatus
.newBuilder()
.setCode(RPCCode.INTERNAL_VALUE)
.setMessage(nf.getLocalizedMessage)
.build()
observer.onError(StatusProto.toStatusRuntimeException(status))
case e: Throwable =>
logError(s"Error during: $opType", e)
observer.onError(
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
}

/**
* This is the main entry method for Spark Connect and all calls to execute a plan.
*
Expand All @@ -64,12 +133,7 @@ class SparkConnectService(debug: Boolean)
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
try {
new SparkConnectStreamHandler(responseObserver).handle(request)
} catch {
case e: Throwable =>
log.error("Error executing plan.", e)
responseObserver.onError(
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
}
} catch handleError("execute", observer = responseObserver)
}

/**
Expand Down Expand Up @@ -114,12 +178,7 @@ class SparkConnectService(debug: Boolean)
response.setClientId(request.getClientId)
responseObserver.onNext(response.build())
responseObserver.onCompleted()
} catch {
case e: Throwable =>
log.error("Error analyzing plan.", e)
responseObserver.onError(
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
}
} catch handleError("analyze", observer = responseObserver)
}

def handleAnalyzePlanRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.connect.service

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
Expand Down Expand Up @@ -128,12 +127,8 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
}
partitions(currentPartitionId) = null

error.foreach {
case NonFatal(e) =>
responseObserver.onError(e)
logError("Error while processing query.", e)
return
case other => throw other
error.foreach { case other =>
throw other
}
part
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package org.apache.spark.sql.connect.planner

import scala.collection.mutable

import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.{BigIntVector, Float8Vector}
import org.apache.arrow.vector.ipc.ArrowStreamReader

import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.sql.connect.dsl.MockRemoteSession
import org.apache.spark.sql.connect.dsl.plans._
Expand Down Expand Up @@ -185,7 +185,7 @@ class SparkConnectServiceSuite extends SharedSparkSession {
}

override def onError(throwable: Throwable): Unit = {
assert(throwable.isInstanceOf[SparkException])
assert(throwable.isInstanceOf[StatusRuntimeException])
}

override def onCompleted(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion dev/create-release/spark-rm/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ARG APT_INSTALL="apt-get install --no-install-recommends -y"
# We should use the latest Sphinx version once this is fixed.
# TODO(SPARK-35375): Jinja2 3.0.0+ causes error when building with Sphinx.
# See also https://issues.apache.org/jira/browse/SPARK-35375.
ARG PIP_PKGS="sphinx==3.0.4 mkdocs==1.1.2 numpy==1.19.4 pydata_sphinx_theme==0.4.1 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==2.11.3 twine==3.4.1 sphinx-plotly-directive==0.1.3 pandas==1.1.5 pyarrow==3.0.0 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.48.1 protobuf==4.21.6"
ARG PIP_PKGS="sphinx==3.0.4 mkdocs==1.1.2 numpy==1.19.4 pydata_sphinx_theme==0.4.1 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==2.11.3 twine==3.4.1 sphinx-plotly-directive==0.1.3 pandas==1.1.5 pyarrow==3.0.0 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.48.1 protobuf==4.21.6 grpcio-status==1.48.1 googleapis-common-protos==1.56.4"
ARG GEM_PKGS="bundler:2.2.9"

# Install extra needed repos and refresh.
Expand Down
2 changes: 1 addition & 1 deletion dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ RUN pypy3 -m pip install numpy 'pandas<=1.5.2' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=1.5.2' scipy unittest-xml-reporting plotly>=4.8 sklearn 'mlflow>=1.0' coverage matplotlib openpyxl 'memory-profiler==0.60.0'

# Add Python deps for Spark Connect.
RUN python3.9 -m pip install grpcio protobuf
RUN python3.9 -m pip install grpcio protobuf googleapis-common-protos grpcio-status
2 changes: 2 additions & 0 deletions dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function mypy_annotation_test {

echo "starting mypy annotations test..."
MYPY_REPORT=$( ($MYPY_BUILD \
--namespace-packages \
--config-file python/mypy.ini \
--cache-dir /tmp/.mypy_cache/ \
python/pyspark) 2>&1)
Expand Down Expand Up @@ -127,6 +128,7 @@ function mypy_examples_test {
echo "starting mypy examples test..."

MYPY_REPORT=$( (MYPYPATH=python $MYPY_BUILD \
--namespace-packages \
--config-file python/mypy.ini \
--exclude "mllib/*" \
examples/src/main/python/) 2>&1)
Expand Down
4 changes: 4 additions & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ black==22.6.0

# Spark Connect (required)
grpcio==1.48.1
grpcio-status==1.48.1
protobuf==3.19.5
googleapis-common-protos==1.56.4

# Spark Connect python proto generation plugin (optional)
mypy-protobuf==3.3.0
googleapis-common-protos-stubs==2.2.0
grpc-stubs==1.24.11
20 changes: 11 additions & 9 deletions python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ To install PySpark from source, refer to |building_spark|_.

Dependencies
------------
============= ========================= ======================================================================================
Package Minimum supported version Note
============= ========================= ======================================================================================
`py4j` 0.10.9.7 Required
`pandas` 1.0.5 Required for pandas API on Spark and Spark Connect; Optional for Spark SQL
`pyarrow` 1.0.0 Required for pandas API on Spark and Spark Connect; Optional for Spark SQL
`numpy` 1.15 Required for pandas API on Spark and MLLib DataFrame-based API; Optional for Spark SQL
`grpc` 1.48.1 Required for Spark Connect
============= ========================= ======================================================================================
========================== ========================= ======================================================================================
Package Minimum supported version Note
========================== ========================= ======================================================================================
`py4j` 0.10.9.7 Required
`pandas` 1.0.5 Required for pandas API on Spark and Spark Connect; Optional for Spark SQL
`pyarrow` 1.0.0 Required for pandas API on Spark and Spark Connect; Optional for Spark SQL
`numpy` 1.15 Required for pandas API on Spark and MLLib DataFrame-based API; Optional for Spark SQL
`grpc` 1.48.1 Required for Spark Connect
`grpcio-status` 1.48.1 Required for Spark Connect
`googleapis-common-protos` 1.56.4 Required for Spark Connect
========================== ========================= ======================================================================================

Note that PySpark requires Java 8 or later with ``JAVA_HOME`` properly set.
If using JDK 11, set ``-Dio.netty.tryReflectionSetAccessible=true`` for Arrow related features and refer
Expand Down
1 change: 1 addition & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ disallow_untyped_defs = True
show_error_codes = True
warn_unused_ignores = True
warn_redundant_casts = True
namespace_packages = True

[mypy-pyspark.sql.connect.proto.*]
ignore_errors = True
Expand Down
Loading

0 comments on commit 10b4484

Please sign in to comment.