Skip to content

Commit

Permalink
[FLINK-23213][python] Remove BeamTableStatefulPythonFunctionRunner an…
Browse files Browse the repository at this point in the history
…d BeamTableStatelessPythonFunctionRunner and merge them into BeamTablePythonFunctionRunner
  • Loading branch information
dianfu committed Jul 2, 2021
1 parent 68bc6c6 commit 18dca62
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -143,7 +143,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {

@Override
public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
return new BeamTableStatelessPythonFunctionRunner(
return new BeamTablePythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
userDefinedFunctionInputType,
Expand All @@ -152,6 +152,9 @@ public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
getUserDefinedFunctionsProto(),
jobOptions,
getFlinkMetricContainer(),
null,
null,
null,
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatefulPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -192,7 +192,7 @@ public void processElement(StreamRecord<RowData> element) throws Exception {

@Override
public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
return new BeamTableStatefulPythonFunctionRunner(
return new BeamTablePythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
userDefinedFunctionInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import com.google.protobuf.GeneratedMessageV3;
import org.apache.beam.model.pipeline.v1.RunnerApi;

import java.util.Map;
Expand All @@ -38,17 +39,19 @@

/** A {@link BeamTablePythonFunctionRunner} used to execute Python functions in Table API. */
@Internal
public abstract class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {

private final RowType inputType;
private final RowType outputType;
private final GeneratedMessageV3 userDefinedFunctionProto;

BeamTablePythonFunctionRunner(
public BeamTablePythonFunctionRunner(
String taskName,
PythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
GeneratedMessageV3 userDefinedFunctionProto,
Map<String, String> jobOptions,
FlinkMetricContainer flinkMetricContainer,
KeyedStateBackend keyedStateBackend,
Expand All @@ -75,6 +78,12 @@ public abstract class BeamTablePythonFunctionRunner extends BeamPythonFunctionRu
outputMode);
this.inputType = Preconditions.checkNotNull(inputType);
this.outputType = Preconditions.checkNotNull(outputType);
this.userDefinedFunctionProto = Preconditions.checkNotNull(userDefinedFunctionProto);
}

@Override
protected byte[] getUserDefinedFunctionsProtoBytes() {
return userDefinedFunctionProto.toByteArray();
}

@Override
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.RowDataArrowSerializer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
Expand All @@ -43,8 +43,7 @@
* A {@link PassThroughPythonAggregateFunctionRunner} runner that just return the first input
* element with the same key as the execution results.
*/
public class PassThroughPythonAggregateFunctionRunner
extends BeamTableStatelessPythonFunctionRunner {
public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonFunctionRunner {

private static final IntSerializer windowBoundarySerializer = IntSerializer.INSTANCE;

Expand Down Expand Up @@ -84,6 +83,9 @@ public PassThroughPythonAggregateFunctionRunner(
jobOptions,
flinkMetricContainer,
null,
null,
null,
null,
0.0,
FlinkFnApi.CoderParam.DataType.ARROW,
FlinkFnApi.CoderParam.DataType.ARROW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
Expand All @@ -35,7 +35,7 @@
* A {@link PassThroughPythonScalarFunctionRunner} runner that just return the input elements as the
* execution results.
*/
public class PassThroughPythonScalarFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonFunctionRunner {

private final List<byte[]> buffer;

Expand All @@ -58,6 +58,9 @@ public PassThroughPythonScalarFunctionRunner(
jobOptions,
flinkMetricContainer,
null,
null,
null,
null,
0.0,
FlinkFnApi.CoderParam.DataType.FLATTEN_ROW,
FlinkFnApi.CoderParam.DataType.FLATTEN_ROW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
Expand All @@ -32,10 +32,10 @@
import java.util.Map;

/**
* A {@link BeamTableStatelessPythonFunctionRunner} that emit each input element in inner join and
* emit null in left join when certain test conditions are met.
* A {@link BeamTablePythonFunctionRunner} that emit each input element in inner join and emit null
* in left join when certain test conditions are met.
*/
public class PassThroughPythonTableFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
public class PassThroughPythonTableFunctionRunner extends BeamTablePythonFunctionRunner {

private int num = 0;

Expand All @@ -60,6 +60,9 @@ public PassThroughPythonTableFunctionRunner(
jobOptions,
flinkMetricContainer,
null,
null,
null,
null,
0.0,
FlinkFnApi.CoderParam.DataType.FLATTEN_ROW,
FlinkFnApi.CoderParam.DataType.FLATTEN_ROW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatefulPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
Expand All @@ -39,8 +39,7 @@
* stream group aggregate operators. It will process the input data with the provided
* `processFunction`.
*/
public class PassThroughStreamAggregatePythonFunctionRunner
extends BeamTableStatefulPythonFunctionRunner {
public class PassThroughStreamAggregatePythonFunctionRunner extends BeamTablePythonFunctionRunner {

private final List<byte[]> buffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatefulPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
Expand All @@ -37,7 +37,7 @@
* Python stream group window aggregate operators.
*/
public class PassThroughStreamGroupWindowAggregatePythonFunctionRunner
extends BeamTableStatefulPythonFunctionRunner {
extends BeamTablePythonFunctionRunner {

private final PassThroughPythonStreamGroupWindowAggregateOperator operator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatefulPythonFunctionRunner;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
Expand All @@ -41,7 +41,7 @@
* `processFunction`.
*/
public class PassThroughStreamTableAggregatePythonFunctionRunner
extends BeamTableStatefulPythonFunctionRunner {
extends BeamTablePythonFunctionRunner {

private final List<byte[]> buffer;

Expand Down

0 comments on commit 18dca62

Please sign in to comment.