Skip to content

Commit

Permalink
[FLINK-24859][doc][formats] Make new formats name coherent
Browse files Browse the repository at this point in the history
  • Loading branch information
echauchot authored and AHeise committed Dec 2, 2021
1 parent 8827f5f commit 575aa2d
Show file tree
Hide file tree
Showing 21 changed files with 70 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/datastream/hybridsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
```java
long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/datastream/hybridsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
```java
long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*
* <pre>{@code
* FileSource<String> fileSource =
* FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
* FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
* KafkaSource<String> kafkaSource =
* KafkaSource.<String>builder()
* .setBootstrapServers("localhost:9092")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@
* with their internal buffering of stream input and charset decoder state.
*/
@PublicEvolving
public class TextLineFormat extends SimpleStreamFormat<String> {
public class TextLineInputFormat extends SimpleStreamFormat<String> {

private static final long serialVersionUID = 1L;

public static final String DEFAULT_CHARSET_NAME = "UTF-8";

private final String charsetName;

public TextLineFormat() {
public TextLineInputFormat() {
this(DEFAULT_CHARSET_NAME);
}

public TextLineFormat(String charsetName) {
public TextLineInputFormat(String charsetName) {
this.charsetName = charsetName;
}

Expand All @@ -72,7 +72,7 @@ public TypeInformation<String> getProducedType() {

// ------------------------------------------------------------------------

/** The actual reader for the {@code TextLineFormat}. */
/** The actual reader for the {@code TextLineInputFormat}. */
public static final class Reader implements StreamFormat.Reader<String> {

private final BufferedReader reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
Expand Down Expand Up @@ -119,7 +119,8 @@ private void testBoundedTextFileSource(FailoverType failoverType) throws Excepti
writeHiddenJunkFiles(testDir);

final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), Path.fromLocalFile(testDir))
.build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -184,7 +185,8 @@ private void testContinuousTextFileSource(FailoverType type) throws Exception {
final File testDir = TMP_FOLDER.newFolder();

final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
FileSource.forRecordStreamFormat(
new TextLineInputFormat(), Path.fromLocalFile(testDir))
.monitorContinuously(Duration.ofMillis(5))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.core.fs.Path;

Expand Down Expand Up @@ -64,7 +64,7 @@ public void testNoSplitRequestWhenSplitRestored() throws Exception {
private static FileSourceReader<String, FileSourceSplit> createReader(
TestingReaderContext context) {
return new FileSourceReader<>(
context, new StreamFormatAdapter<>(new TextLineFormat()), new Configuration());
context, new StreamFormatAdapter<>(new TextLineInputFormat()), new Configuration());
}

private static FileSourceSplit createTestFileSplit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
import org.apache.flink.connectors.hive.read.HiveInputFormat;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
Expand Down Expand Up @@ -310,7 +310,7 @@ private RowType getProducedRowType() {

private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
return LimitableBulkFormat.create(
new HiveBulkFormatAdapter(
new HiveInputFormat(
new JobConfWrapper(jobConf),
partitionKeys,
fullSchema.getFieldNames(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public HiveCompactReaderFactory(
@Override
public CompactReader<RowData> create(CompactContext context) throws IOException {
HiveSourceSplit split = createSplit(context.getPath(), context.getFileSystem());
HiveBulkFormatAdapter format =
new HiveBulkFormatAdapter(
HiveInputFormat format =
new HiveInputFormat(
jobConfWrapper,
partitionKeys,
fieldNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.OrcColumnarRowInputFormat;
import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.catalog.hive.client.HiveShim;
Expand Down Expand Up @@ -62,11 +62,11 @@
* BulkFormat instances, because different hive partitions may need different BulkFormat to do the
* reading.
*/
public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSplit> {
public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(HiveBulkFormatAdapter.class);
private static final Logger LOG = LoggerFactory.getLogger(HiveInputFormat.class);

// schema evolution configs are not available in older versions of IOConstants, let's define
// them ourselves
Expand All @@ -83,7 +83,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli
private final boolean useMapRedReader;
private final PartitionFieldExtractor<HiveSourceSplit> partitionFieldExtractor;

public HiveBulkFormatAdapter(
public HiveInputFormat(
JobConfWrapper jobConfWrapper,
List<String> partitionKeys,
String[] fieldNames,
Expand Down Expand Up @@ -162,7 +162,7 @@ private RowType tableRowType() {
}
}

private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() {
private OrcColumnarRowInputFormat<?, HiveSourceSplit> createOrcFormat() {
return hiveVersion.startsWith("1.")
? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
jobConfWrapper.conf(),
Expand All @@ -172,7 +172,7 @@ private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() {
computeSelectedFields(),
Collections.emptyList(),
DEFAULT_SIZE)
: OrcColumnarRowFileInputFormat.createPartitionedFormat(
: OrcColumnarRowInputFormat.createPartitionedFormat(
OrcShim.createShim(hiveVersion),
jobConfWrapper.conf(),
tableRowType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
Expand Down Expand Up @@ -88,7 +88,7 @@ public static void main(String[] args) throws Exception {
// Each file will be processed as plain text and split based on newlines.
FileSource.FileSourceBuilder<String> builder =
FileSource.forRecordStreamFormat(
new TextLineFormat(), params.getInputs().get());
new TextLineInputFormat(), params.getInputs().get());

// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
Expand Down Expand Up @@ -94,7 +94,7 @@ public static void main(String[] args) throws Exception {
// Each file will be processed as plain text and split based on newlines.
FileSource.FileSourceBuilder<String> builder =
FileSource.forRecordStreamFormat(
new TextLineFormat(), params.getInputs().get());
new TextLineInputFormat(), params.getInputs().get());

// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
Expand Down Expand Up @@ -107,7 +107,7 @@ public static void main(String[] args) throws Exception {
// Each file will be processed as plain text and split based on newlines.
FileSource.FileSourceBuilder<String> builder =
FileSource.forRecordStreamFormat(
new TextLineFormat(), params.getInputs().get());
new TextLineInputFormat(), params.getInputs().get());

// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.connector.file.src.FileSource
import org.apache.flink.connector.file.src.reader.TextLineFormat
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
Expand Down Expand Up @@ -97,7 +97,7 @@ object TopSpeedWindowing {
case Some(input) =>
// Create a new file source that will read files from a given set of directories.
// Each file will be processed as plain text and split based on newlines.
val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
val builder = FileSource.forRecordStreamFormat(new TextLineInputFormat, input:_*)
params.discoveryInterval.foreach { duration =>
// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.connector.file.src.FileSource
import org.apache.flink.connector.file.src.reader.TextLineFormat
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
Expand Down Expand Up @@ -95,7 +95,7 @@ object WindowWordCount {
case Some(input) =>
// Create a new file source that will read files from a given set of directories.
// Each file will be processed as plain text and split based on newlines.
val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
val builder = FileSource.forRecordStreamFormat(new TextLineInputFormat, input:_*)
params.discoveryInterval.foreach { duration =>
// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.connector.file.src.FileSource
import org.apache.flink.connector.file.src.reader.TextLineFormat
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
Expand Down Expand Up @@ -104,7 +104,7 @@ object WordCount {
case Some(input) =>
// Create a new file source that will read files from a given set of directories.
// Each file will be processed as plain text and split based on newlines.
val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
val builder = FileSource.forRecordStreamFormat(new TextLineInputFormat, input:_*)
params.discoveryInterval.foreach { duration =>
// If a discovery interval is provided, the source will
// continuously watch the given directories for new files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.orc.nohive;

import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.OrcColumnarRowInputFormat;
import org.apache.flink.orc.OrcFilters;
import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
import org.apache.flink.orc.vector.ColumnBatchFactory;
Expand All @@ -42,16 +42,16 @@
import static org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;

/** Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive. */
/** Helper class to create {@link OrcColumnarRowInputFormat} for no-hive. */
public class OrcNoHiveColumnarRowInputFormat {
private OrcNoHiveColumnarRowInputFormat() {}

/**
* Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
* Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
* generated by split.
*/
public static <SplitT extends FileSourceSplit>
OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
Configuration hadoopConfig,
RowType tableType,
List<String> partitionKeys,
Expand Down Expand Up @@ -84,7 +84,7 @@ OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedForma
return new VectorizedColumnBatch(vectors);
};

return new OrcColumnarRowFileInputFormat<>(
return new OrcColumnarRowInputFormat<>(
new OrcNoHiveShim(),
hadoopConfig,
convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@
* fields, which can be extracted from path. Therefore, the {@link #getProducedType()} may be
* different and types of extra fields need to be added.
*/
public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSplit>
public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {

private static final long serialVersionUID = 1L;

private final ColumnBatchFactory<BatchT, SplitT> batchFactory;
private final RowType projectedOutputType;

public OrcColumnarRowFileInputFormat(
public OrcColumnarRowInputFormat(
final OrcShim<BatchT> shim,
final Configuration hadoopConfig,
final TypeDescription schema,
Expand Down Expand Up @@ -126,11 +126,11 @@ public RecordIterator<RowData> convertAndGetIterator(
}

/**
* Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
* Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
* generated by split.
*/
public static <SplitT extends FileSourceSplit>
OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
OrcShim<VectorizedRowBatch> shim,
Configuration hadoopConfig,
RowType tableType,
Expand Down Expand Up @@ -164,7 +164,7 @@ OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedForma
return new VectorizedColumnBatch(vectors);
};

return new OrcColumnarRowFileInputFormat<>(
return new OrcColumnarRowInputFormat<>(
shim,
hadoopConfig,
convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
Expand Down
Loading

0 comments on commit 575aa2d

Please sign in to comment.