Skip to content

Commit

Permalink
Spark 3.3, Flink 1.16: Handle ResolvingFileIO while determining local…
Browse files Browse the repository at this point in the history
…ity (apache#6655)

Co-authored-by: Prashant Singh <[email protected]>
  • Loading branch information
singhpk234 and Prashant Singh authored Feb 4, 2023
1 parent 6e19b53 commit bc6862b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 45 deletions.
45 changes: 41 additions & 4 deletions core/src/main/java/org/apache/iceberg/hadoop/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +45,8 @@ public class Util {

public static final String VERSION_HINT_FILENAME = "version-hint.text";

private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

private static final Logger LOG = LoggerFactory.getLogger(Util.class);

private Util() {}
Expand Down Expand Up @@ -84,16 +88,49 @@ public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
}

public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);

} else {
return false;
}
}

return false;
}

private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {
InputFile inputFile = io.newInputFile(task.file().path().toString());
if (inputFile instanceof HadoopInputFile) {
HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile;
return hadoopInputFile.getBlockLocations(task.start(), task.length());
String location = task.file().path().toString();
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
return ((HadoopInputFile) inputFile).getBlockLocations(task.start(), task.length());

} else {
return HadoopInputFile.NO_LOCATION_PREFERENCE;
}
} else {
return HadoopInputFile.NO_LOCATION_PREFERENCE;
}
}

private static boolean usesHadoopFileIO(FileIO io, String location) {
if (io instanceof HadoopFileIO) {
return true;

} else if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
return HadoopFileIO.class.isAssignableFrom(resolvingFileIO.ioClass(location));

} else {
return false;
}
}

/**
* From Apache Spark
*
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -168,6 +169,15 @@ private static String implFromLocation(String location) {
return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
}

public Class<?> ioClass(String location) {
String fileIOClassName = implFromLocation(location);
try {
return Class.forName(fileIOClassName);
} catch (ClassNotFoundException e) {
throw new ValidationException("Class %s not found : %s", fileIOClassName, e.getMessage());
}
}

private static String scheme(String location) {
int colonPos = location.indexOf(":");
if (colonPos > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,17 @@
*/
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SourceUtil {
private SourceUtil() {}

private static final Logger LOG = LoggerFactory.getLogger(SourceUtil.class);
private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");

static boolean isLocalityEnabled(
Table table, ReadableConfig readableConfig, Boolean exposeLocality) {
Boolean localityEnabled =
Expand All @@ -50,22 +40,7 @@ static boolean isLocalityEnabled(
return false;
}

FileIO fileIO = table.io();
if (fileIO instanceof HadoopFileIO) {
HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
try {
String scheme =
new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
} catch (IOException e) {
LOG.warn(
"Failed to determine whether the locality information can be exposed for table: {}",
table,
e);
}
}

return false;
return Util.mayHaveBlockLocations(table.io(), table.location());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
package org.apache.iceberg.spark;

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.SparkSession;

Expand All @@ -48,8 +45,6 @@
*/
public class SparkReadConf {

private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

private final SparkSession spark;
private final Table table;
private final Map<String, String> readOptions;
Expand All @@ -67,14 +62,8 @@ public boolean caseSensitive() {
}

public boolean localityEnabled() {
if (table.io() instanceof HadoopFileIO) {
HadoopInputFile file = (HadoopInputFile) table.io().newInputFile(table.location());
String scheme = file.getFileSystem().getScheme();
boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
}

return false;
boolean defaultValue = Util.mayHaveBlockLocations(table.io(), table.location());
return PropertyUtil.propertyAsBoolean(readOptions, SparkReadOptions.LOCALITY, defaultValue);
}

public Long snapshotId() {
Expand Down

0 comments on commit bc6862b

Please sign in to comment.