Skip to content

Commit

Permalink
[FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/re…
Browse files Browse the repository at this point in the history
…adTextFile() methods
  • Loading branch information
afedulov authored and MartijnVisser committed Jun 30, 2022
1 parent 0e95f5a commit 3e73fb5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.flink.streaming.examples.iteration;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
Expand Down Expand Up @@ -83,7 +86,13 @@ public static void main(String[] args) throws Exception {
// create input stream of integer pairs
DataStream<Tuple2<Integer, Integer>> inputStream;
if (params.has("input")) {
inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
FileSource<String> fileSource =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), new Path(params.get("input")))
.build();
inputStream =
env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "Tuples Source")
.map(new FibonacciInputMap());
} else {
System.out.println("Executing Iterate example with default input data set.");
System.out.println("Use --input to specify file input.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
Expand Down Expand Up @@ -70,23 +73,29 @@ public static void main(String[] args) throws Exception {
env.getConfig().setGlobalJobParameters(params);

// get input data
DataStream<String> text;
DataStream<String> textWithTimestampAndWatermark;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
FileSource<String> fileSource =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), new Path(params.get("input")))
.build();
textWithTimestampAndWatermark =
env.fromSource(
fileSource, IngestionTimeWatermarkStrategy.create(), "Words Source");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
DataStreamSource<String> text = env.fromElements(WordCountData.WORDS);
// We assign the WatermarkStrategy after creating the source because
// StreamExecutionEnvironment#fromElemenets() methods currently does not accept
// WatermarkStrategies. In a real-world job you should integrate the WatermarkStrategy
// in the source as shown above for the FileSource.
textWithTimestampAndWatermark =
text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
}

// We assign the WatermarkStrategy after creating the source. In a real-world job you
// should integrate the WatermarkStrategy in the source. The Kafka source allows this,
// for example.
SingleOutputStreamOperator<String> textWithTimestampAndWatermark =
text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());

SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized =
textWithTimestampAndWatermark.process(new Tokenizer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,17 @@ private <OUT> DataStreamSource<OUT> fromParallelCollection(
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
* "hdfs://host:port/file/path").
* @return The data stream that represents the data read from the given file as text lines
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
* example of reading a file using a simple {@code TextLineInputFormat}:
* <pre>{@code
* FileSource<String> source =
* FileSource.forRecordStreamFormat(
* new TextLineInputFormat(), new Path("/foo/bar"))
* .build();
* }</pre>
*/
@Deprecated
public DataStreamSource<String> readTextFile(String filePath) {
return readTextFile(filePath, "UTF-8");
}
Expand All @@ -1365,7 +1375,17 @@ public DataStreamSource<String> readTextFile(String filePath) {
* "hdfs://host:port/file/path")
* @param charsetName The name of the character set used to read the file
* @return The data stream that represents the data read from the given file as text lines
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
* example of reading a file using a simple {@code TextLineInputFormat}:
* <pre>{@code
* FileSource<String> source =
* FileSource.forRecordStreamFormat(
* new TextLineInputFormat("UTF-8"), new Path("/foo/bar"))
* .build();
* }</pre>
*/
@Deprecated
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(filePath),
Expand Down Expand Up @@ -1402,7 +1422,17 @@ public DataStreamSource<String> readTextFile(String filePath, String charsetName
* @param inputFormat The input format used to create the data stream
* @param <OUT> The type of the returned data stream
* @return The data stream that represents the data read from the given file
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
* example of reading a file using a simple {@code TextLineInputFormat}:
* <pre>{@code
* FileSource<String> source =
* FileSource.forRecordStreamFormat(
* new TextLineInputFormat(), new Path("/foo/bar"))
* .build();
* }</pre>
*/
@Deprecated
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
}
Expand Down Expand Up @@ -1482,7 +1512,18 @@ public <OUT> DataStreamSource<OUT> readFile(
* millis) between consecutive path scans
* @param <OUT> The type of the returned data stream
* @return The data stream that represents the data read from the given file
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
* example of reading a file using a simple {@code TextLineInputFormat}:
* <pre>{@code
* FileSource<String> source =
* FileSource.forRecordStreamFormat(
* new TextLineInputFormat(), new Path("/foo/bar"))
* .monitorContinuously(Duration.of(10, SECONDS))
* .build();
* }</pre>
*/
@Deprecated
@PublicEvolving
public <OUT> DataStreamSource<OUT> readFile(
FileInputFormat<OUT> inputFormat,
Expand Down Expand Up @@ -1557,7 +1598,18 @@ public DataStream<String> readFileStream(
* millis) between consecutive path scans
* @param <OUT> The type of the returned data stream
* @return The data stream that represents the data read from the given file
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
* example of reading a file using a simple {@code TextLineInputFormat}:
* <pre>{@code
* FileSource<String> source =
* FileSource.forRecordStreamFormat(
* new TextLineInputFormat(), new Path("/foo/bar"))
* .monitorContinuously(Duration.of(10, SECONDS))
* .build();
* }</pre>
*/
@Deprecated
@PublicEvolving
public <OUT> DataStreamSource<OUT> readFile(
FileInputFormat<OUT> inputFormat,
Expand Down

0 comments on commit 3e73fb5

Please sign in to comment.