Skip to content

Commit

Permalink
Flink: sync 1.16 with 1.17 for backports missed or not ported identic…
Browse files Browse the repository at this point in the history
…ally (apache#7403)
  • Loading branch information
stevenzwu authored Apr 24, 2023
1 parent 69c3249 commit 38cf2a1
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -59,14 +58,8 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
private final Schema projectedSchema;
private final String nameMapping;
private final boolean caseSensitive;

private final FlinkSourceFilter rowFilter;

public RowDataFileScanTaskReader(
Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList());
}

public RowDataFileScanTaskReader(
Schema tableSchema,
Schema projectedSchema,
Expand Down Expand Up @@ -148,7 +141,6 @@ private CloseableIterable<RowData> newIterable(
if (rowFilter != null) {
return CloseableIterable.filter(iter, rowFilter::filter);
}

return iter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichMapFunction;
Expand Down Expand Up @@ -125,7 +126,8 @@ public RewriteMap(
this.encryptionManager = encryptionManager;
this.taskWriterFactory = taskWriterFactory;
this.rowDataReader =
new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive);
new RowDataFileScanTaskReader(
schema, schema, nameMapping, caseSensitive, Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.Collections;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -58,28 +57,7 @@ public static AvroGenericRecordReaderFunction fromTable(Table table) {
false,
table.io(),
table.encryption(),
Collections.emptyList());
}

public AvroGenericRecordReaderFunction(
String tableName,
ReadableConfig config,
Schema tableSchema,
Schema projectedSchema,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
this(
tableName,
config,
tableSchema,
projectedSchema,
nameMapping,
caseSensitive,
io,
encryption,
Collections.emptyList());
null);
}

public AvroGenericRecordReaderFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.Collections;
import java.util.List;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
Expand All @@ -41,25 +40,6 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
private final EncryptionManager encryption;
private final List<Expression> filters;

public RowDataReaderFunction(
ReadableConfig config,
Schema tableSchema,
Schema projectedSchema,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
this(
config,
tableSchema,
projectedSchema,
nameMapping,
caseSensitive,
io,
encryption,
Collections.emptyList());
}

public RowDataReaderFunction(
ReadableConfig config,
Schema tableSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.BaseCombinedScanTask;
Expand Down Expand Up @@ -83,7 +84,8 @@ public static FileScanTask createFileTask(

public static DataIterator<RowData> createDataIterator(CombinedScanTask combinedTask) {
return new DataIterator<>(
new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true),
new RowDataFileScanTaskReader(
TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()),
combinedTask,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source.reader;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -106,7 +107,8 @@ private IcebergSourceReader createReader(
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
new PlaintextEncryptionManager(),
Collections.emptyList());
return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -55,7 +56,8 @@ protected ReaderFunction<RowData> readerFunction() {
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
new PlaintextEncryptionManager(),
Collections.emptyList());
}

@Override
Expand Down

0 comments on commit 38cf2a1

Please sign in to comment.