Skip to content

Commit

Permalink
[FLINK-10027][DataStream API] Add logging to StreamingFileSink.
Browse files Browse the repository at this point in the history
This closes apache#6477.
  • Loading branch information
kl0u committed Aug 2, 2018
1 parent 1b0baa1 commit 852502b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -35,12 +38,13 @@
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
*
* <p>For each incoming element in the {@code StreamingFileSink}, the user-specified
* {@link BucketAssigner Bucketer} is queried to see in which bucket this element should
* be written to.
* {@link BucketAssigner} is queried to see in which bucket this element should be written to.
*/
@Internal
public class Bucket<IN, BucketID> {

private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);

private static final String PART_PREFIX = "part";

private final BucketID bucketId;
Expand Down Expand Up @@ -111,6 +115,7 @@ private Bucket(
}

private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {

// we try to resume the previous in-progress file
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
Expand Down Expand Up @@ -162,18 +167,36 @@ void merge(final Bucket<IN, BucketID> bucket) throws IOException {
if (committable != null) {
pendingPartsForCurrentCheckpoint.add(committable);
}

if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} merging buckets for bucket id={}", subtaskIndex, bucketId);
}
}

void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {

if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
subtaskIndex, bucketId, element);
}

rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}

private void rollPartFile(final long currentTime) throws IOException {
closePartFile();
inProgressPart = partFileFactory.openNew(bucketId, fsWriter, assembleNewPartPath(), currentTime);

final Path partFilePath = assembleNewPartPath();
inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);

if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
subtaskIndex, partFilePath.getName(), bucketId);
}

partCounter++;
}

Expand Down Expand Up @@ -213,6 +236,9 @@ BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOExcept

private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
}
closePartFile();
}

Expand Down Expand Up @@ -242,6 +268,11 @@ void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {

void onProcessingTime(long timestamp) throws IOException {
if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " +
"(in-progress file created @ {}, last updated @ {} and current time is {}).",
subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp);
}
closePartFile();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,26 @@ RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
return committableFilesPerCheckpoint;
}

@Override
public String toString() {
final StringBuilder strBuilder = new StringBuilder();

strBuilder
.append("BucketState for bucketId=").append(bucketId)
.append(" and bucketPath=").append(bucketPath);

if (hasInProgressResumableFile()) {
strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
}

if (!committableFilesPerCheckpoint.isEmpty()) {
strBuilder.append(", has pending files for checkpoints: {");
for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
strBuilder.append(checkpointId).append(' ');
}
strBuilder.append('}');
}
return strBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -47,6 +50,8 @@
@Internal
public class Buckets<IN, BucketID> {

private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);

// ------------------------ configuration fields --------------------------

private final Path basePath;
Expand Down Expand Up @@ -102,7 +107,13 @@ public class Buckets<IN, BucketID> {
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();

this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
try {
this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
} catch (IOException e) {
LOG.error("Unable to create filesystem for path: {}", basePath);
throw e;
}

this.bucketStateSerializer = new BucketStateSerializer<>(
fsWriter.getResumeRecoverableSerializer(),
fsWriter.getCommitRecoverableSerializer(),
Expand All @@ -129,7 +140,11 @@ public class Buckets<IN, BucketID> {
* in-progress/pending part files
*/
void initializeState(final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception {

initializePartCounter(partCounterState);

LOG.info("Subtask {} initializing its state (max part counter={}).", subtaskIndex, maxPartCounter);

initializeActiveBuckets(bucketStates);
}

Expand All @@ -153,6 +168,10 @@ private void initializeActiveBuckets(final ListState<byte[]> bucketStates) throw
private void handleRestoredBucketState(final BucketState<BucketID> recoveredState) throws Exception {
final BucketID bucketId = recoveredState.getBucketId();

if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} restoring: {}", subtaskIndex, recoveredState);
}

final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
fsWriter,
Expand All @@ -179,6 +198,8 @@ void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
activeBuckets.entrySet().iterator();

LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);

while (activeBucketIt.hasNext()) {
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
Expand All @@ -200,6 +221,9 @@ void snapshotState(
fsWriter != null && bucketStateSerializer != null,
"sink has not been initialized");

LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
subtaskIndex, checkpointId, maxPartCounter);

snapshotActiveBuckets(checkpointId, bucketStatesContainer);
partCounterStateContainer.add(maxPartCounter);
}
Expand All @@ -215,6 +239,10 @@ private void snapshotActiveBuckets(
.writeVersionAndSerialize(bucketStateSerializer, bucketState);

bucketStatesContainer.add(serializedBucketState);

if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
}
}
}

Expand Down

0 comments on commit 852502b

Please sign in to comment.