Skip to content

Commit

Permalink
[FLINK-10097][DataStream API] Additional tests for StreamingFileSink.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Nov 1, 2018
1 parent f4f9416 commit 9ad932c
Show file tree
Hide file tree
Showing 8 changed files with 656 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -59,10 +62,11 @@ public class Bucket<IN, BucketID> {

private final RollingPolicy<IN, BucketID> rollingPolicy;

private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint = new HashMap<>();
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint;

private long partCounter;

@Nullable
private PartFileWriter<IN, BucketID> inProgressPart;

private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint;
Expand All @@ -88,6 +92,7 @@ private Bucket(
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);

this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
this.pendingPartsPerCheckpoint = new HashMap<>();
}

/**
Expand Down Expand Up @@ -277,6 +282,24 @@ void onProcessingTime(long timestamp) throws IOException {
}
}

// --------------------------- Testing Methods -----------------------------

@VisibleForTesting
Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPartsPerCheckpoint() {
return pendingPartsPerCheckpoint;
}

@Nullable
@VisibleForTesting
PartFileWriter<IN, BucketID> getInProgressPart() {
return inProgressPart;
}

@VisibleForTesting
List<RecoverableWriter.CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
return pendingPartsForCurrentCheckpoint;
}

// --------------------------- Static Factory Methods -----------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -186,6 +187,10 @@ private void handleRestoredBucketState(final BucketState<BucketID> recoveredStat
}

private void updateActiveBucketId(final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
if (!restoredBucket.isActive()) {
return;
}

final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
if (bucket != null) {
bucket.merge(restoredBucket);
Expand Down Expand Up @@ -224,6 +229,9 @@ void snapshotState(
LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
subtaskIndex, checkpointId, maxPartCounter);

bucketStatesContainer.clear();
partCounterStateContainer.clear();

snapshotActiveBuckets(checkpointId, bucketStatesContainer);
partCounterStateContainer.add(maxPartCounter);
}
Expand Down Expand Up @@ -341,4 +349,16 @@ public Long timestamp() {
return elementTimestamp;
}
}

// --------------------------- Testing Methods -----------------------------

@VisibleForTesting
public long getMaxPartCounter() {
return maxPartCounter;
}

@VisibleForTesting
Map<BucketID, Bucket<IN, BucketID>> getActiveBuckets() {
return activeBuckets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(bucketStates != null && maxPartCountersState != null, "sink has not been initialized");

bucketStates.clear();
maxPartCountersState.clear();

buckets.snapshotState(
context.getCheckpointId(),
bucketStates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN elemen

@Override
public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) {
return currentTime - partFileState.getCreationTime() > rolloverInterval ||
currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
return currentTime - partFileState.getCreationTime() >= rolloverInterval ||
currentTime - partFileState.getLastUpdateTime() >= inactivityInterval;
}

/**
Expand Down
Loading

0 comments on commit 9ad932c

Please sign in to comment.