Skip to content

Commit

Permalink
[hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Dec 4, 2018
1 parent 7dc8e69 commit 0c791cf
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -141,7 +141,7 @@ public FileSystem create(URI fsUri) throws IOException {
final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs);
final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);

return new FlinkS3FileSystem(
fs,
Expand All @@ -166,6 +166,6 @@ protected abstract URI getInitURI(
URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);

@Nullable
protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.s3.common.utils.RefCountedFile;
import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -60,7 +60,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;

@Nullable
private final S3MultiPartUploader s3UploadHelper;
private final S3AccessHelper s3AccessHelper;

private final Executor uploadThreadPool;

Expand All @@ -83,7 +83,7 @@ public FlinkS3FileSystem(
String localTmpDirectory,
@Nullable String entropyInjectionKey,
int entropyLength,
@Nullable S3MultiPartUploader s3UploadHelper,
@Nullable S3AccessHelper s3UploadHelper,
long s3uploadPartSize,
int maxConcurrentUploadsPerStream) {

Expand All @@ -99,7 +99,7 @@ public FlinkS3FileSystem(
// recoverable writer parameter configuration initialization
this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
this.s3UploadHelper = s3UploadHelper;
this.s3AccessHelper = s3UploadHelper;
this.uploadThreadPool = Executors.newCachedThreadPool();

Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
Expand Down Expand Up @@ -131,15 +131,15 @@ public String getLocalTmpDir() {

@Override
public RecoverableWriter createRecoverableWriter() throws IOException {
if (s3UploadHelper == null) {
if (s3AccessHelper == null) {
// this is the case for Presto
throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers.");
}

return S3RecoverableWriter.writer(
getHadoopFileSystem(),
tmpFileCreator,
s3UploadHelper,
s3AccessHelper,
uploadThreadPool,
s3uploadPartSize,
maxConcurrentUploadsPerStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
@NotThreadSafe
final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {

private final S3MultiPartUploader s3MPUploader;
private final S3AccessHelper s3AccessHelper;

private final Executor uploadThreadPool;

Expand All @@ -71,7 +71,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
// ------------------------------------------------------------------------

private RecoverableMultiPartUploadImpl(
S3MultiPartUploader s3uploader,
S3AccessHelper s3AccessHelper,
Executor uploadThreadPool,
String uploadId,
String objectName,
Expand All @@ -81,7 +81,7 @@ private RecoverableMultiPartUploadImpl(
) {
checkArgument(numBytes >= 0L);

this.s3MPUploader = checkNotNull(s3uploader);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.uploadThreadPool = checkNotNull(uploadThreadPool);
this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
Expand Down Expand Up @@ -111,7 +111,7 @@ public void uploadPart(RefCountedFSOutputStream file) throws IOException {
currentUploadInfo.registerNewPart(partLength);

file.retain(); // keep the file while the async upload still runs
uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future));
uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
}

@Override
Expand All @@ -124,7 +124,7 @@ public S3Committer snapshotAndGetCommitter() throws IOException {
final S3Recoverable snapshot = snapshotAndGetRecoverable(null);

return new S3Committer(
s3MPUploader,
s3AccessHelper,
snapshot.getObjectName(),
snapshot.uploadId(),
snapshot.parts(),
Expand Down Expand Up @@ -179,7 +179,7 @@ private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) th
// they do not fall under the user's global TTL on S3.
// Figure out a way to clean them.

s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
}
finally {
file.release();
Expand Down Expand Up @@ -244,14 +244,14 @@ private String createTmpObjectName() {
// ------------------------------------------------------------------------

public static RecoverableMultiPartUploadImpl newUpload(
final S3MultiPartUploader s3uploader,
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String objectName) throws IOException {

final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName);
final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName);

return new RecoverableMultiPartUploadImpl(
s3uploader,
s3AccessHelper,
uploadThreadPool,
multiPartUploadId,
objectName,
Expand All @@ -261,7 +261,7 @@ public static RecoverableMultiPartUploadImpl newUpload(
}

public static RecoverableMultiPartUploadImpl recoverUpload(
final S3MultiPartUploader s3uploader,
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String multipartUploadId,
final String objectName,
Expand All @@ -270,7 +270,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(
final Optional<File> incompletePart) {

return new RecoverableMultiPartUploadImpl(
s3uploader,
s3AccessHelper,
uploadThreadPool,
multipartUploadId,
objectName,
Expand All @@ -286,7 +286,7 @@ public static RecoverableMultiPartUploadImpl recoverUpload(

private static class UploadTask implements Runnable {

private final S3MultiPartUploader s3uploader;
private final S3AccessHelper s3AccessHelper;

private final String objectName;

Expand All @@ -299,7 +299,7 @@ private static class UploadTask implements Runnable {
private final CompletableFuture<PartETag> future;

UploadTask(
final S3MultiPartUploader s3uploader,
final S3AccessHelper s3AccessHelper,
final MultiPartUploadInfo currentUpload,
final RefCountedFSOutputStream file,
final CompletableFuture<PartETag> future) {
Expand All @@ -313,15 +313,15 @@ private static class UploadTask implements Runnable {
// these are limits put by Amazon
checkArgument(partNumber >= 1 && partNumber <= 10_000);

this.s3uploader = checkNotNull(s3uploader);
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.file = checkNotNull(file);
this.future = checkNotNull(future);
}

@Override
public void run() {
try (final InputStream inputStream = file.getInputStream()) {
final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* the upload with all its parts will be either committed or discarded.
*/
@Internal
public interface S3MultiPartUploader {
public interface S3AccessHelper {

/**
* Initializes a Multi-Part Upload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe

private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class);

private final S3MultiPartUploader s3uploader;
private final S3AccessHelper s3AccessHelper;

private final String uploadId;

Expand All @@ -50,8 +50,8 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe

private final long totalLength;

S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
this.s3uploader = checkNotNull(s3uploader);
S3Committer(S3AccessHelper s3AccessHelper, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.objectName = checkNotNull(objectName);
this.uploadId = checkNotNull(uploadId);
this.parts = checkNotNull(parts);
Expand All @@ -64,7 +64,7 @@ public void commit() throws IOException {
LOG.info("Committing {} with MPU ID {}", objectName, uploadId);

final AtomicInteger errorCount = new AtomicInteger();
s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);

if (errorCount.get() == 0) {
LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
Expand All @@ -82,14 +82,14 @@ public void commitAfterRecovery() throws IOException {
LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId);

try {
s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
} catch (IOException e) {
LOG.info("Failed to commit after recovery {} with MPU ID {}. " +
"Checking if file was committed before...", objectName, uploadId);
LOG.trace("Exception when committing:", e);

try {
ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName);
ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName);
if (totalLength != metadata.getContentLength()) {
String message = String.format("Inconsistent result for object %s: conflicting lengths. " +
"Recovered committer for upload %s indicates %s bytes, present object is %s bytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {

private final org.apache.hadoop.fs.FileSystem fs;

private final S3MultiPartUploader twoPhaseUploader;
private final S3AccessHelper s3AccessHelper;

private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;

Expand All @@ -53,22 +53,22 @@ final class S3RecoverableMultipartUploadFactory {

S3RecoverableMultipartUploadFactory(
final FileSystem fs,
final S3MultiPartUploader twoPhaseUploader,
final S3AccessHelper s3AccessHelper,
final int maxConcurrentUploadsPerStream,
final Executor executor,
final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {

this.fs = Preconditions.checkNotNull(fs);
this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
this.executor = executor;
this.twoPhaseUploader = twoPhaseUploader;
this.s3AccessHelper = s3AccessHelper;
this.tmpFileSupplier = tmpFileSupplier;
}

RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException {

return RecoverableMultiPartUploadImpl.newUpload(
twoPhaseUploader,
s3AccessHelper,
limitedExecutor(),
pathToObjectName(path));
}
Expand All @@ -77,7 +77,7 @@ RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) t
final Optional<File> incompletePart = downloadLastDataChunk(recoverable);

return RecoverableMultiPartUploadImpl.recoverUpload(
twoPhaseUploader,
s3AccessHelper,
limitedExecutor(),
recoverable.uploadId(),
recoverable.getObjectName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private static S3Recoverable castToS3Recoverable(RecoverableWriter.CommitRecover
public static S3RecoverableWriter writer(
final FileSystem fs,
final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
final S3MultiPartUploader twoPhaseUploader,
final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final long userDefinedMinPartSize,
final int maxConcurrentUploadsPerStream) {
Expand All @@ -139,7 +139,7 @@ public static S3RecoverableWriter writer(
final S3RecoverableMultipartUploadFactory uploadFactory =
new S3RecoverableMultipartUploadFactory(
fs,
twoPhaseUploader,
s3AccessHelper,
maxConcurrentUploadsPerStream,
uploadThreadPool,
tempFileCreator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.fs.s3.common;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.util.TestLogger;

import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -78,7 +78,7 @@ protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopC

@Nullable
@Override
protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,10 @@ public void execute(Runnable command) {
}

/**
* A {@link S3MultiPartUploader} that simulates uploading part files to S3 by
* A {@link S3AccessHelper} that simulates uploading part files to S3 by
* simply putting complete and incomplete part files in lists for further validation.
*/
private static class StubMultiPartUploader implements S3MultiPartUploader {
private static class StubMultiPartUploader implements S3AccessHelper {

private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
Expand Down
Loading

0 comments on commit 0c791cf

Please sign in to comment.