Skip to content

Commit

Permalink
[FLINK-7461] Remove Backwards compatibility with <= Flink 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Aug 24, 2017
1 parent 5456cf9 commit 6642768
Show file tree
Hide file tree
Showing 126 changed files with 318 additions and 12,512 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -132,7 +131,7 @@
@Deprecated
public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction,
CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {
CheckpointListener {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -758,25 +757,6 @@ private void handleRestoredBucketState(BucketState bucketState) {
}
}

// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------

@Override
public void restoreState(BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);

try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
}

handleRestoredBucketState(state);
}

// --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.Clock;
Expand Down Expand Up @@ -154,8 +153,7 @@
*/
public class BucketingSink<T>
extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -872,25 +870,6 @@ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pe
}
}

// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------

@Override
public void restoreState(RollingSink.BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);

try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
}

handleRestoredRollingSinkState(state);
}

// --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
Expand Down

This file was deleted.

Loading

0 comments on commit 6642768

Please sign in to comment.