Skip to content

Commit

Permalink
[FLINK-13117][table-planner-blink] Do not need to backup and restore …
Browse files Browse the repository at this point in the history
…streamEnv config in BatchExecutor

This closes apache#9179
  • Loading branch information
Xupingyong authored and wuchong committed Jul 23, 2019
1 parent 2d8ba48 commit 399de8b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
@Internal
public class BatchExecutor extends ExecutorBase {

private BatchExecEnvConfig batchExecEnvConfig = new BatchExecEnvConfig();

@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
Expand All @@ -58,10 +56,9 @@ public JobExecutionResult execute(String jobName) throws Exception {
}

/**
* Backup previous streamEnv config and set batch configs.
* Sets batch configs.
*/
private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) {
batchExecEnvConfig.backup(execEnv);
private void setBatchProperties(StreamExecutionEnvironment execEnv) {
ExecutionConfig executionConfig = execEnv.getConfig();
executionConfig.enableObjectReuse();
executionConfig.setLatencyTrackingInterval(-1);
Expand All @@ -77,7 +74,7 @@ private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) {
*/
public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
backupAndUpdateStreamEnv(execEnv);
setBatchProperties(execEnv);
transformations.forEach(execEnv::addOperator);
StreamGraph streamGraph;
streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
Expand All @@ -91,11 +88,12 @@ public StreamGraph generateStreamGraph(List<Transformation<?>> transformations,
streamGraph.setChaining(true);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
}
if (isShuffleModeAllBatch()) {
streamGraph.setBlockingConnectionsBetweenChains(true);
}
batchExecEnvConfig.restore(execEnv);
return streamGraph;
}

Expand All @@ -109,45 +107,4 @@ private boolean isShuffleModeAllBatch() {
}
return false;
}

/**
* Batch configs that are set in {@link StreamExecutionEnvironment}. We should backup and change
* these configs and restore finally.
*/
private static class BatchExecEnvConfig {

private boolean enableObjectReuse;
private long latencyTrackingInterval;
private long bufferTimeout;
private TimeCharacteristic timeCharacteristic;
private InputDependencyConstraint inputDependencyConstraint;

/**
* Backup previous streamEnv config.
*/
public void backup(StreamExecutionEnvironment execEnv) {
ExecutionConfig executionConfig = execEnv.getConfig();
enableObjectReuse = executionConfig.isObjectReuseEnabled();
latencyTrackingInterval = executionConfig.getLatencyTrackingInterval();
timeCharacteristic = execEnv.getStreamTimeCharacteristic();
bufferTimeout = execEnv.getBufferTimeout();
inputDependencyConstraint = executionConfig.getDefaultInputDependencyConstraint();
}

/**
* Restore previous streamEnv after execute batch jobs.
*/
public void restore(StreamExecutionEnvironment execEnv) {
ExecutionConfig executionConfig = execEnv.getConfig();
if (enableObjectReuse) {
executionConfig.enableObjectReuse();
} else {
executionConfig.disableObjectReuse();
}
executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
execEnv.setStreamTimeCharacteristic(timeCharacteristic);
execEnv.setBufferTimeout(bufferTimeout);
executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
}
}
}

This file was deleted.

0 comments on commit 399de8b

Please sign in to comment.