forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-40815][SQL] Add
DelegateSymlinkTextInputFormat
to workaround…
… `SymlinkTextInputSplit` bug ### What changes were proposed in this pull request? This PR is a follow-up for apache#31909. In the original PR, `spark.hadoopRDD.ignoreEmptySplits` was enabled due to seemingly no side-effects, however, this change breaks `SymlinkTextInputFormat` so any table that uses the input format would return empty results. This is due to a combination of problems: 1. Incorrect implementation of [SymlinkTextInputSplit](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L73). The input format does not set `start` and `length` fields from the target split. `SymlinkTextInputSplit` is an abstraction over FileSplit and all downstream systems treat it as such - those fields should be extracted and passed from the target split. 2. `spark.hadoopRDD.ignoreEmptySplits` being enabled causes HadoopRDD to filter out all of the empty splits which does not work in the case of SymlinkTextInputFormat. This is due to 1. Because we don't set any length (and start) those splits are considered to be empty and are removed from the final list of partitions even though the target splits themselves are non-empty. Technically, this needs to be addressed in Hive but I figured it would be much faster to fix this in Spark. The PR introduces `DelegateSymlinkTextInputFormat` that wraps SymlinkTextInputFormat and provides splits with the correct start and length attributes. This is controlled by `spark.sql.hive.useDelegateForSymlinkTextInputFormat` which is enabled by default. When disabled, the user-provided SymlinkTextInputFormat will be used. ### Why are the changes needed? Fixes a correctness issue when using `SymlinkTextInputSplit` in Spark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test that reproduces the issue and verified that it passes with the fix. Closes apache#38277 from sadikovi/fix-symlink-input-format. Authored-by: Ivan Sadikov <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
- Loading branch information
1 parent
9fc3aa0
commit 668ab27
Showing
4 changed files
with
238 additions
and
3 deletions.
There are no files selected for viewing
121 changes: 121 additions & 0 deletions
121
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hadoop.hive.ql.io; | ||
|
||
import java.io.DataInput; | ||
import java.io.DataOutput; | ||
import java.io.IOException; | ||
|
||
import org.apache.hadoop.fs.ContentSummary; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.io.LongWritable; | ||
import org.apache.hadoop.io.Text; | ||
import org.apache.hadoop.mapred.FileSplit; | ||
import org.apache.hadoop.mapred.InputSplit; | ||
import org.apache.hadoop.mapred.JobConf; | ||
import org.apache.hadoop.mapred.RecordReader; | ||
import org.apache.hadoop.mapred.Reporter; | ||
|
||
/** | ||
* Delegate for SymlinkTextInputFormat, created to address SPARK-40815. | ||
* Fixes an issue where SymlinkTextInputFormat returns empty splits which could result in | ||
* the correctness issue when "spark.hadoopRDD.ignoreEmptySplits" is enabled. | ||
* <p> | ||
* In this class, we update the split start and length to match the target file input thus fixing | ||
* the issue. | ||
*/ | ||
public class DelegateSymlinkTextInputFormat extends SymlinkTextInputFormat { | ||
|
||
public static class DelegateSymlinkTextInputSplit extends FileSplit { | ||
private Path targetPath; // Path to the actual data file, not the symlink file. | ||
|
||
// Used for deserialisation. | ||
public DelegateSymlinkTextInputSplit() { | ||
super((Path) null, 0, 0, (String[]) null); | ||
targetPath = null; | ||
} | ||
|
||
public DelegateSymlinkTextInputSplit(SymlinkTextInputSplit split) throws IOException { | ||
// It is fine to set start and length to the target file split because | ||
// SymlinkTextInputFormat maintains 1-1 mapping between SymlinkTextInputSplit and FileSplit. | ||
super(split.getPath(), | ||
split.getTargetSplit().getStart(), | ||
split.getTargetSplit().getLength(), | ||
split.getTargetSplit().getLocations()); | ||
this.targetPath = split.getTargetSplit().getPath(); | ||
} | ||
|
||
/** | ||
* Returns target path. | ||
* Visible for testing. | ||
*/ | ||
public Path getTargetPath() { | ||
return targetPath; | ||
} | ||
|
||
/** | ||
* Reconstructs the delegate input split. | ||
*/ | ||
private SymlinkTextInputSplit getSplit() throws IOException { | ||
return new SymlinkTextInputSplit( | ||
getPath(), | ||
new FileSplit(targetPath, getStart(), getLength(), getLocations()) | ||
); | ||
} | ||
|
||
@Override | ||
public void write(DataOutput out) throws IOException { | ||
super.write(out); | ||
Text.writeString(out, (this.targetPath != null) ? this.targetPath.toString() : ""); | ||
} | ||
|
||
@Override | ||
public void readFields(DataInput in) throws IOException { | ||
super.readFields(in); | ||
String target = Text.readString(in); | ||
this.targetPath = (!target.isEmpty()) ? new Path(target) : null; | ||
} | ||
} | ||
|
||
@Override | ||
public RecordReader<LongWritable, Text> getRecordReader( | ||
InputSplit split, JobConf job, Reporter reporter) throws IOException { | ||
InputSplit targetSplit = ((DelegateSymlinkTextInputSplit) split).getSplit(); | ||
return super.getRecordReader(targetSplit, job, reporter); | ||
} | ||
|
||
@Override | ||
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { | ||
InputSplit[] splits = super.getSplits(job, numSplits); | ||
for (int i = 0; i < splits.length; i++) { | ||
SymlinkTextInputSplit split = (SymlinkTextInputSplit) splits[i]; | ||
splits[i] = new DelegateSymlinkTextInputSplit(split); | ||
} | ||
return splits; | ||
} | ||
|
||
@Override | ||
public void configure(JobConf job) { | ||
super.configure(job); | ||
} | ||
|
||
@Override | ||
public ContentSummary getContentSummary(Path p, JobConf job) throws IOException { | ||
return super.getContentSummary(p, job); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters