Skip to content

Commit

Permalink
[FLINK-28229][connectors] Deprecate StreamExecutionEnvironment#fromCo…
Browse files Browse the repository at this point in the history
…llection()
  • Loading branch information
afedulov authored and zentol committed Dec 5, 2023
1 parent e44efbf commit 18c03f2
Show file tree
Hide file tree
Showing 33 changed files with 174 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void testDisallowProgramConfigurationChanges(
// Add/mutate values in the configuration
environment.configure(programConfig);

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(MutatedConfigurationException.class)
.hasMessageContainingAll(
Expand Down Expand Up @@ -106,15 +106,15 @@ void testDisallowCheckpointStorage(
// Change the CheckpointConfig
environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(MutatedConfigurationException.class)
.hasMessageContainingAll(
CheckpointConfig.class.getSimpleName(), "setCheckpointStorage");

environment.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(MutatedConfigurationException.class)
.hasMessageContainingAll(
Expand Down Expand Up @@ -143,7 +143,7 @@ void testDisallowCheckpointStorageByConfiguration(
false,
Collections.emptyList());

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(MutatedConfigurationException.class)
.hasMessageContainingAll(
Expand All @@ -169,7 +169,7 @@ void testAllowCheckpointStorage(
// Change the CheckpointConfig
environment.getCheckpointConfig().setCheckpointStorage(allowedPath);

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(ExecutorReachedException.class);
}
Expand All @@ -186,7 +186,7 @@ void testNotModifiedCheckpointStorage(
final StreamContextEnvironment environment =
constructStreamContextEnvironment(clusterConfig, Collections.emptyList());

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(ExecutorReachedException.class);
}
Expand All @@ -202,7 +202,7 @@ void testForSinkTransformation(
final StreamContextEnvironment environment =
constructStreamContextEnvironment(clusterConfig, Collections.emptyList());

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(ExecutorReachedException.class);
}
Expand Down Expand Up @@ -235,7 +235,7 @@ void testAllowProgramConfigurationWildcards(
environment.configure(jobConfig);
environment.getConfig().setMaxParallelism(1024);

environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
.isInstanceOf(ExecutorReachedException.class);
assertThat(environment.getConfig().getGlobalJobParameters().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);

env.fromCollection(Lists.newArrayList(1, 2, 3)).sinkTo(new DiscardingSink<>());
env.fromData(Lists.newArrayList(1, 2, 3)).sinkTo(new DiscardingSink<>());
env.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;

import java.time.Duration;

Expand Down Expand Up @@ -76,12 +74,18 @@ public static void main(String[] args) throws Exception {

// create the data sources for both grades and salaries
DataStream<Tuple2<String, Integer>> grades =
GradeSource.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
env.fromSource(
WindowJoinSampleData.getGradeGeneratorSource(rate),
IngestionTimeWatermarkStrategy.create(),
"Grades Data Generator")
.setParallelism(1);

DataStream<Tuple2<String, Integer>> salaries =
SalarySource.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
env.fromSource(
WindowJoinSampleData.getSalaryGeneratorSource(rate),
IngestionTimeWatermarkStrategy.create(),
"Grades Data Generator")
.setParallelism(1);

// run the actual window join program
// for testability, this functionality is in a separate method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.utils.ThrottledIterator;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;

import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;

/** Sample data for the {@link WindowJoin} example. */
Expand All @@ -38,58 +36,27 @@ public class WindowJoinSampleData {
static final int SALARY_MAX = 10000;

/** Continuously generates (name, grade). */
public static class GradeSource implements Iterator<Tuple2<String, Integer>>, Serializable {

private final Random rnd = new Random(hashCode());

@Override
public boolean hasNext() {
return true;
}

@Override
public Tuple2<String, Integer> next() {
return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(GRADE_COUNT) + 1);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

public static DataStream<Tuple2<String, Integer>> getSource(
StreamExecutionEnvironment env, long rate) {
return env.fromCollection(
new ThrottledIterator<>(new GradeSource(), rate),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
}
public static DataGeneratorSource<Tuple2<String, Integer>> getGradeGeneratorSource(
double elementsPerSecond) {
return getTupleGeneratorSource(GRADE_COUNT, elementsPerSecond);
}

/** Continuously generates (name, salary). */
public static class SalarySource implements Iterator<Tuple2<String, Integer>>, Serializable {

private final Random rnd = new Random(hashCode());

@Override
public boolean hasNext() {
return true;
}

@Override
public Tuple2<String, Integer> next() {
return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(SALARY_MAX) + 1);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
public static DataGeneratorSource<Tuple2<String, Integer>> getSalaryGeneratorSource(
double elementsPerSecond) {
return getTupleGeneratorSource(SALARY_MAX, elementsPerSecond);
}

public static DataStream<Tuple2<String, Integer>> getSource(
StreamExecutionEnvironment env, long rate) {
return env.fromCollection(
new ThrottledIterator<>(new SalarySource(), rate),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
}
private static DataGeneratorSource<Tuple2<String, Integer>> getTupleGeneratorSource(
int maxValue, double elementsPerSecond) {
final Random rnd = new Random();
final GeneratorFunction<Long, Tuple2<String, Integer>> generatorFunction =
index -> new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(maxValue) + 1);

return new DataGeneratorSource<>(
generatorFunction,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(elementsPerSecond),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public static void main(String[] args) throws Exception {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

final DataStream<Order> orderA =
env.fromCollection(
env.fromData(
Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));

final DataStream<Order> orderB =
env.fromCollection(
env.fromData(
Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static RowData validateRow(RowData rowData, RowType rowType) throws Excep
(Row) DataFormatConverters.getConverterForDataType(rowDataType).toExternal(rowData);
TypeInformation<Row> rowTypeInfo =
(TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(rowDataType);
DataStream<Row> rows = env.fromCollection(Collections.singletonList(row), rowTypeInfo);
DataStream<Row> rows = env.fromData(Collections.singletonList(row), rowTypeInfo);

Table table = tableEnv.fromDataStream(rows);
tableEnv.createTemporaryView("t", table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,7 @@ private static DataStream<OperatorState> getFinalOperatorStates(
}

DataStream<OperatorState> existingOperatorStates =
executionEnvironment
.fromCollection(existingOperators)
.name("existingOperatorStates");
executionEnvironment.fromData(existingOperators).name("existingOperatorStates");

existingOperatorStates
.flatMap(new StatePathExtractor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ private void bootstrapState(StateBackend backend, String savepointPath) throws E
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

StateBootstrapTransformation<Account> transformation =
OperatorTransformation.bootstrapWith(env.fromCollection(accounts))
OperatorTransformation.bootstrapWith(env.fromData(accounts))
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper());

StateBootstrapTransformation<CurrencyRate> broadcastTransformation =
OperatorTransformation.bootstrapWith(env.fromCollection(currencyRates))
OperatorTransformation.bootstrapWith(env.fromData(currencyRates))
.transform(new CurrencyBootstrapFunction());

SavepointWriter writer =
Expand All @@ -141,15 +141,15 @@ private void validateBootstrap(StateBackend backend, String savepointPath) throw
}

DataStream<Account> stream =
env.fromCollection(accounts)
env.fromData(accounts)
.keyBy(acc -> acc.id)
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);

final CloseableIterator<Account> results = stream.collectAsync();

env.fromCollection(currencyRates)
.connect(env.fromCollection(currencyRates).broadcast(descriptor))
env.fromData(currencyRates)
.connect(env.fromData(currencyRates).broadcast(descriptor))
.process(new CurrencyValidationFunction())
.uid(CURRENCY_UID)
.sinkTo(new DiscardingSink<>());
Expand Down Expand Up @@ -192,7 +192,7 @@ private void validateModification(StateBackend backend, String savepointPath) th
}

DataStream<Account> stream =
sEnv.fromCollection(accounts)
sEnv.fromData(accounts)
.keyBy(acc -> acc.id)
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private static String bootstrapState(

private static StateBootstrapTransformation<Integer> bootstrap(
StreamExecutionEnvironment env, Collection<Integer> data) {
return OperatorTransformation.bootstrapWith(env.fromCollection(data))
return OperatorTransformation.bootstrapWith(env.fromData(data))
.keyBy(v -> v)
.transform(new StateBootstrapper());
}
Expand Down Expand Up @@ -194,7 +194,7 @@ private static void runAndValidate(
final List<CloseableIterator<Integer>> iterators = new ArrayList<>();
for (Tuple2<Collection<Integer>, String> assertion : assertions) {
iterators.add(
env.fromCollection(assertion.f0)
env.fromData(assertion.f0)
.keyBy(v -> v)
.map(new StateReader())
.uid(assertion.f1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testTumbleWindow() throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

DataStream<Tuple2<String, Integer>> bootstrapData =
env.fromCollection(WORDS)
env.fromData(WORDS)
.map(word -> Tuple2.of(word, 1))
.returns(TUPLE_TYPE_INFO)
.assignTimestampsAndWatermarks(
Expand Down Expand Up @@ -190,7 +190,7 @@ public void testTumbleWindowWithEvictor() throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

DataStream<Tuple2<String, Integer>> bootstrapData =
env.fromCollection(WORDS)
env.fromData(WORDS)
.map(word -> Tuple2.of(word, 1))
.returns(TUPLE_TYPE_INFO)
.assignTimestampsAndWatermarks(
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testSlideWindow() throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

DataStream<Tuple2<String, Integer>> bootstrapData =
env.fromCollection(WORDS)
env.fromData(WORDS)
.map(word -> Tuple2.of(word, 1), TUPLE_TYPE_INFO)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>noWatermarks()
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testSlideWindowWithEvictor() throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

DataStream<Tuple2<String, Integer>> bootstrapData =
env.fromCollection(WORDS)
env.fromData(WORDS)
.map(word -> Tuple2.of(word, 1))
.returns(TUPLE_TYPE_INFO)
.assignTimestampsAndWatermarks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ private void validateBootstrap(StateBackend backend, String savepointPath) throw
sEnv.setStateBackend(backend);

DataStream<Account> stream =
sEnv.fromCollection(accounts)
sEnv.fromData(accounts)
.keyBy(acc -> acc.id)
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);

CloseableIterator<Account> results = stream.collectAsync();

sEnv.fromCollection(currencyRates)
.connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
sEnv.fromData(currencyRates)
.connect(sEnv.fromData(currencyRates).broadcast(descriptor))
.process(new CurrencyValidationFunction())
.uid(CURRENCY_UID)
.sinkTo(new DiscardingSink<>());
Expand Down Expand Up @@ -195,7 +195,7 @@ private void validateModification(StateBackend backend, String savepointPath) th
sEnv.setStateBackend(backend);

DataStream<Account> stream =
sEnv.fromCollection(accounts)
sEnv.fromData(accounts)
.keyBy(acc -> acc.id)
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testJobName() {
config.set(PipelineOptions.NAME, jobName);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

env.fromCollection(Collections.singletonList("test")).sinkTo(new DiscardingSink<>());
env.fromData(Collections.singletonList("test")).sinkTo(new DiscardingSink<>());
StreamGraph streamGraph = env.getStreamGraph(true);
assertThat(streamGraph.getJobName()).isEqualTo(jobName);
}
Expand Down
Loading

0 comments on commit 18c03f2

Please sign in to comment.