Skip to content

Commit

Permalink
[hotfix][table-planner-blink] Enable idle state cleanup for Changelog…
Browse files Browse the repository at this point in the history
…Normalize operator
  • Loading branch information
wuchong committed Nov 25, 2020
1 parent bd5bc76 commit 62657de
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,14 @@ class StreamExecChangelogNormalize(
val tableConfig = planner.getTableConfig
val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
val stateIdleTime = tableConfig.getIdleStateRetention.toMillis
val operator = if (isMiniBatchEnabled) {
val exeConfig = planner.getExecEnv.getConfig
val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
val processFunction = new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
rowTypeInfo,
rowSerializer,
// disable state ttl, the changelog normalize should keep all state to have data integrity
// we can enable state ttl if this is really needed in some cases
-1,
stateIdleTime,
generateUpdateBefore,
true, // generateInsert
false) // inputInsertOnly
Expand All @@ -112,7 +111,7 @@ class StreamExecChangelogNormalize(
} else {
val processFunction = new ProcTimeDeduplicateKeepLastRowFunction(
rowTypeInfo,
-1, // disable state ttl
stateIdleTime,
generateUpdateBefore,
true, // generateInsert
false) // inputInsertOnly
Expand Down

0 comments on commit 62657de

Please sign in to comment.