Skip to content

Commit

Permalink
[FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID t…
Browse files Browse the repository at this point in the history
…o in-progress file to avoid conflicts


This closes apache#12452
  • Loading branch information
gaoyunhaii authored Jun 9, 2020
1 parent 74231f7 commit 45f42f8
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 86 deletions.
37 changes: 37 additions & 0 deletions flink-formats/flink-hadoop-bulk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,43 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<!-- This dependency is no longer shipped with the JDK since Java 9.-->
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/**
* The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
*/
Expand All @@ -31,7 +33,19 @@ public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFac
private static final long serialVersionUID = 1L;

@Override
public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) {
public HadoopFileCommitter create(
Configuration configuration,
Path targetFilePath) throws IOException {

return new HadoopRenameFileCommitter(configuration, targetFilePath);
}

@Override
public HadoopFileCommitter recoverForCommit(
Configuration configuration,
Path targetFilePath,
Path tempFilePath) throws IOException {

return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface HadoopFileCommitter {
*
* @return The path of the intermediate file to commit.
*/
Path getInProgressFilePath();
Path getTempFilePath();

/**
* Prepares the intermediates file for committing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,25 @@
public interface HadoopFileCommitterFactory extends Serializable {

/**
* Creates the corresponding Hadoop file committer according to the Hadoop
* configuration and the target path.
* Creates a new Hadoop file committer for writing.
*
* @param configuration The hadoop configuration.
* @param targetFilePath The target path to commit.
* @param targetFilePath The target path to commit to.
* @return The corresponding Hadoop file committer.
*/
HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException;

/**
* Creates a Hadoop file committer for commit the pending file.
*
* @param configuration The hadoop configuration.
* @param targetFilePath The target path to commit to.
* @param inProgressPath The path of the remaining pending file.
* @return The corresponding Hadoop file committer.
*/
HadoopFileCommitter recoverForCommit(
Configuration configuration,
Path targetFilePath,
Path inProgressPath) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,28 @@ public void commitAfterRecovery() throws IOException {

public PendingFileRecoverable getRecoverable() {
return new HadoopPathBasedPendingFileRecoverable(
fileCommitter.getTargetFilePath());
fileCommitter.getTargetFilePath(),
fileCommitter.getTempFilePath());
}
}

@VisibleForTesting
static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable {
private final Path path;
private final Path targetFilePath;

public HadoopPathBasedPendingFileRecoverable(Path path) {
this.path = path;
private final Path tempFilePath;

public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) {
this.targetFilePath = targetFilePath;
this.tempFilePath = tempFilePath;
}

public Path getTargetFilePath() {
return targetFilePath;
}

public Path getPath() {
return path;
public Path getTempFilePath() {
return tempFilePath;
}
}

Expand All @@ -142,14 +150,21 @@ public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) {
throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
}

Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath();
HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
(HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
Path path = hadoopRecoverable.getTargetFilePath();
Path inProgressPath = hadoopRecoverable.getTempFilePath();

byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET);

byte[] targetBytes = new byte[8 + pathBytes.length];
byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length];
ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(MAGIC_NUMBER);
bb.putInt(pathBytes.length);
bb.put(pathBytes);
bb.putInt(inProgressBytes.length);
bb.put(inProgressBytes);

return targetBytes;
}
Expand All @@ -171,11 +186,17 @@ private HadoopPathBasedPendingFileRecoverable deserializeV1(byte[] serialized) t
throw new IOException("Corrupt data: Unexpected magic number.");
}

byte[] pathBytes = new byte[bb.getInt()];
bb.get(pathBytes);
String targetPath = new String(pathBytes, CHARSET);
byte[] targetFilePathBytes = new byte[bb.getInt()];
bb.get(targetFilePathBytes);
String targetFilePath = new String(targetFilePathBytes, CHARSET);

byte[] tempFilePathBytes = new byte[bb.getInt()];
bb.get(tempFilePathBytes);
String tempFilePath = new String(tempFilePathBytes, CHARSET);

return new HadoopPathBasedPendingFileRecoverable(new Path(targetPath));
return new HadoopPathBasedPendingFileRecoverable(
new Path(targetFilePath),
new Path(tempFilePath));
}
}

Expand All @@ -202,7 +223,9 @@ public InProgressFileRecoverable deserialize(int version, byte[] serialized) {
}

/**
* Factory to create {@link HadoopPathBasedPartFileWriter}.
* Factory to create {@link HadoopPathBasedPartFileWriter}. This writer does not support snapshotting
* the in-progress files. For pending files, it stores the target path and the staging file path into
* the state.
*/
public static class HadoopPathBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
private final Configuration configuration;
Expand Down Expand Up @@ -230,7 +253,7 @@ public HadoopPathBasedPartFileWriter<IN, BucketID> openNewInProgressFile(
Path path = new Path(flinkPath.toUri());
HadoopFileCommitter fileCommitter = fileCommitterFactory.create(configuration, path);

Path inProgressFilePath = fileCommitter.getInProgressFilePath();
Path inProgressFilePath = fileCommitter.getTempFilePath();
HadoopPathBasedBulkWriter<IN> writer = bulkWriterFactory.create(path, inProgressFilePath);
return new HadoopPathBasedPartFileWriter<>(bucketID, writer, fileCommitter, creationTime);
}
Expand All @@ -241,8 +264,12 @@ public PendingFile recoverPendingFile(PendingFileRecoverable pendingFileRecovera
throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
}

Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath();
return new HadoopPathBasedPendingFile(fileCommitterFactory.create(configuration, path));
HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
(HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
return new HadoopPathBasedPendingFile(fileCommitterFactory.recoverForCommit(
configuration,
hadoopRecoverable.getTargetFilePath(),
hadoopRecoverable.getTempFilePath()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand All @@ -39,12 +40,22 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {

private final Path targetFilePath;

private final Path inProgressFilePath;
private final Path tempFilePath;

public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) {
public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) throws IOException {
this.configuration = configuration;
this.targetFilePath = targetFilePath;
this.inProgressFilePath = generateInProgressFilePath();
this.tempFilePath = generateTempFilePath();
}

public HadoopRenameFileCommitter(
Configuration configuration,
Path targetFilePath,
Path inProgressPath) throws IOException {

this.configuration = configuration;
this.targetFilePath = targetFilePath;
this.tempFilePath = inProgressPath;
}

@Override
Expand All @@ -53,8 +64,8 @@ public Path getTargetFilePath() {
}

@Override
public Path getInProgressFilePath() {
return inProgressFilePath;
public Path getTempFilePath() {
return tempFilePath;
}

@Override
Expand All @@ -75,11 +86,10 @@ public void commitAfterRecovery() throws IOException {
private void rename(boolean assertFileExists) throws IOException {
FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);

if (!fileSystem.exists(inProgressFilePath)) {
if (!fileSystem.exists(tempFilePath)) {
if (assertFileExists) {
throw new IOException(String.format("In progress file(%s) not exists.", inProgressFilePath));
throw new IOException(String.format("In progress file(%s) not exists.", tempFilePath));
} else {

// By pass the re-commit if source file not exists.
// TODO: in the future we may also need to check if the target file exists.
return;
Expand All @@ -88,20 +98,27 @@ private void rename(boolean assertFileExists) throws IOException {

try {
// If file exists, it will be overwritten.
fileSystem.rename(inProgressFilePath, targetFilePath);
fileSystem.rename(tempFilePath, targetFilePath);
} catch (IOException e) {
throw new IOException(
String.format("Could not commit file from %s to %s", inProgressFilePath, targetFilePath),
String.format("Could not commit file from %s to %s", tempFilePath, targetFilePath),
e);
}
}

private Path generateInProgressFilePath() {
private Path generateTempFilePath() throws IOException {
checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute");

FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);

Path parent = targetFilePath.getParent();
String name = targetFilePath.getName();

return new Path(parent, "." + name + ".inprogress");
while (true) {
Path candidate = new Path(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
if (!fileSystem.exists(candidate)) {
return candidate;
}
}
}
}
Loading

0 comments on commit 45f42f8

Please sign in to comment.