Skip to content

Commit

Permalink
[FLINK-32978][flink-core] Migrate the usage of RichFunction#open(Conf…
Browse files Browse the repository at this point in the history
…iguration parameters) to RichFunction#open(OpenContext openContext)

This closes apache#23058
  • Loading branch information
WencongLiu authored and xintongsong committed Sep 12, 2023
1 parent 798a20a commit e935331
Show file tree
Hide file tree
Showing 311 changed files with 923 additions and 839 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/datastream/operators/asyncio.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
client = new DatabaseClient(host, post, credentials);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class CountWithTimeoutFunction
private ValueState<CountWithTimestamp> state;

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}

Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ joinedStream
ValueState<String> seen;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
seen = getRuntimeContext().getState(
new ValueStateDescriptor<>("seen", String.class));
}
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ public class SocketSourceFunction extends RichSourceFunction<RowData> implements
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
deserializer.open(() -> getRuntimeContext().getMetricGroup());
}

Expand Down
6 changes: 3 additions & 3 deletions docs/content.zh/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte
ListState<Long> updateTimes;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);

Expand Down Expand Up @@ -197,7 +197,7 @@ public class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState
ListState<Long> updateTimes;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);

Expand Down Expand Up @@ -430,7 +430,7 @@ public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Ac
ValueState<Double> state;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total",Types.DOUBLE);
state = getRuntimeContext().getState(descriptor);
}
Expand Down
6 changes: 3 additions & 3 deletions docs/content.zh/docs/try-flink/datastream.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
private transient ValueState<Boolean> flagState;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
Expand Down Expand Up @@ -463,7 +463,7 @@ private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
Expand Down Expand Up @@ -567,7 +567,7 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
private transient ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/datastream/operators/asyncio.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
client = new DatabaseClient(host, post, credentials);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class CountWithTimeoutFunction
private ValueState<CountWithTimestamp> state;

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}

Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ joinedStream
ValueState<String> seen;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
seen = getRuntimeContext().getState(
new ValueStateDescriptor<>("seen", String.class));
}
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ public class SocketSourceFunction extends RichSourceFunction<RowData> implements
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
deserializer.open(() -> getRuntimeContext().getMetricGroup());
}

Expand Down
6 changes: 3 additions & 3 deletions docs/content/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte
ListState<Long> updateTimes;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);

Expand Down Expand Up @@ -196,7 +196,7 @@ public class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState
ListState<Long> updateTimes;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);

Expand Down Expand Up @@ -429,7 +429,7 @@ public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Ac
ValueState<Double> state;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total",Types.DOUBLE);
state = getRuntimeContext().getState(descriptor);
}
Expand Down
6 changes: 3 additions & 3 deletions docs/content/docs/try-flink/datastream.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
private transient ValueState<Boolean> flagState;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
Expand Down Expand Up @@ -411,7 +411,7 @@ private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
Expand Down Expand Up @@ -510,7 +510,7 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
private transient ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,9 +797,9 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.pro
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactOperator.java:0)
Method <org.apache.flink.connector.file.table.utils.CompactFileUtils.doSingleFileMove(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path)> calls method <org.apache.flink.util.IOUtils.copyBytes(java.io.InputStream, java.io.OutputStream, boolean)> in (CompactFileUtils.java:117)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.invoke(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.SinkFunction$Context)> calls method <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(java.lang.Object)> in (PrintTableSinkFactory.java:187)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.open(int, int)> in (PrintTableSinkFactory.java:180)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()> in (PrintTableSinkFactory.java:180)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.configuration.Configuration)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (PrintTableSinkFactory.java:180)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)> calls method <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.open(int, int)> in (PrintTableSinkFactory.java:180)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()> in (PrintTableSinkFactory.java:180)
Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (PrintTableSinkFactory.java:180)
Static Initializer <org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE> in (CompactCoordinator.java:67)
Static Initializer <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE> in (CompactorOperator.java:85)
Static Initializer <org.apache.flink.connector.file.src.FileSourceSplitSerializer.<clinit>()> calls constructor <org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in (FileSourceSplitSerializer.java:42)
Expand All @@ -813,4 +813,4 @@ Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCo
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:51)
Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (ProcTimeCommitTrigger.java:46)
Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:46)
Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:46)
Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:46)
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -104,7 +104,7 @@ private void executeAndVerify(
stream.addSink(
new RichSinkFunction<Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
getRuntimeContext()
.addAccumulator("result", new ListAccumulator<Integer>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.connector.file.table.batch.compact;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.connector.file.table.FileSystemCommitter;
import org.apache.flink.connector.file.table.FileSystemFactory;
import org.apache.flink.connector.file.table.PartitionCommitPolicy;
Expand Down Expand Up @@ -81,7 +81,7 @@ public BatchPartitionCommitterSink(
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
partitionsFiles = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.connector.file.table.batch.compact;

import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.FileSystemCommitterTest;
import org.apache.flink.connector.file.table.FileSystemFactory;
import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testPartitionCommit() throws Exception {
new LinkedHashMap<>(),
identifier,
new PartitionCommitPolicyFactory(null, null, null, null));
committerSink.open(new Configuration());
committerSink.open(DefaultOpenContext.INSTANCE);

List<Path> pathList1 = createFiles(path, "task-1/p1=0/p2=0/", "f1", "f2");
List<Path> pathList2 = createFiles(path, "task-2/p1=0/p2=0/", "f3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.table.catalog.hive.factories;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.CatalogLock;
Expand Down Expand Up @@ -127,7 +127,7 @@ private TestLockSink(ObjectIdentifier tableIdentifier, CatalogLock.Factory lockF
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext openContext) throws Exception {
this.lock = lockFactory.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.flink.hadoopcompatibility.mapred;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
Expand Down Expand Up @@ -83,9 +84,10 @@ public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
this.jobConf = conf;
}

@PublicEvolving
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.mapper.configure(jobConf);

this.reporter = new HadoopDummyReporter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.hadoopcompatibility.mapred;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand All @@ -28,7 +30,6 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -105,9 +106,10 @@ public HadoopReduceCombineFunction(
}

@SuppressWarnings("unchecked")
@PublicEvolving
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.reducer.configure(jobConf);
this.combiner.configure(jobConf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.hadoopcompatibility.mapred;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand All @@ -27,7 +29,6 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -89,9 +90,10 @@ public HadoopReduceFunction(
}

@SuppressWarnings("unchecked")
@PublicEvolving
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.reducer.configure(jobConf);

this.reporter = new HadoopDummyReporter();
Expand Down
Loading

0 comments on commit e935331

Please sign in to comment.