Skip to content

Commit

Permalink
[hotfix][fs-connector] Refactor Bucket to statically import Precondit…
Browse files Browse the repository at this point in the history
…ions.
  • Loading branch information
kl0u committed Dec 4, 2018
1 parent 0c1328a commit 7dc8e69
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +37,9 @@
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
*
Expand Down Expand Up @@ -84,13 +86,13 @@ private Bucket(
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy) {

this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.bucketId = checkNotNull(bucketId);
this.bucketPath = checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
this.partFileFactory = checkNotNull(partFileFactory);
this.rollingPolicy = checkNotNull(rollingPolicy);

this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
this.pendingPartsPerCheckpoint = new HashMap<>();
Expand Down Expand Up @@ -158,17 +160,17 @@ boolean isActive() {
}

void merge(final Bucket<IN, BucketID> bucket) throws IOException {
Preconditions.checkNotNull(bucket);
Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath));
checkNotNull(bucket);
checkState(Objects.equals(bucket.bucketPath, bucketPath));

// There should be no pending files in the "to-merge" states.
// The reason is that:
// 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
// So a snapshot, including the one we are recovering from, will never contain such files.
// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).

Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
checkState(bucket.pendingPartsPerCheckpoint.isEmpty());

RecoverableWriter.CommitRecoverable committable = bucket.closePartFile();
if (committable != null) {
Expand Down Expand Up @@ -257,7 +259,7 @@ private void prepareBucketForCheckpointing(long checkpointId) throws IOException
}

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
Preconditions.checkNotNull(fsWriter);
checkNotNull(fsWriter);

Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
pendingPartsPerCheckpoint.entrySet().iterator();
Expand Down

0 comments on commit 7dc8e69

Please sign in to comment.