Skip to content

Commit

Permalink
[FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from Fl…
Browse files Browse the repository at this point in the history
…inkChangelogModeInferenceProgram and StreamNonDeterministicUpdatePlanVisitor
  • Loading branch information
lincoln-lil committed Sep 25, 2024
1 parent 1543c72 commit 00625b0
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelateBase;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand;
Expand Down Expand Up @@ -184,8 +183,6 @@ public StreamPhysicalRel visit(
return visitOverAggregate((StreamPhysicalOverAggregateBase) rel, requireDeterminism);
} else if (rel instanceof StreamPhysicalRank) {
return visitRank((StreamPhysicalRank) rel, requireDeterminism);
} else if (rel instanceof StreamPhysicalDeduplicate) {
return visitDeduplicate((StreamPhysicalDeduplicate) rel, requireDeterminism);
} else if (rel instanceof StreamPhysicalWindowDeduplicate) {
return visitWindowDeduplicate(
(StreamPhysicalWindowDeduplicate) rel, requireDeterminism);
Expand Down Expand Up @@ -677,22 +674,6 @@ private StreamPhysicalRel visitRank(
}
}

private StreamPhysicalRel visitDeduplicate(
final StreamPhysicalDeduplicate dedup, final ImmutableBitSet requireDeterminism) {
// output row type same as input and does not change output columns' order
if (inputInsertOnly(dedup)) {
// similar to rank, output is deterministic when input is insert only, so required
// determinism always be satisfied here.
return transmitDeterminismRequirement(dedup, NO_REQUIRED_DETERMINISM);
} else {
// Deduplicate always has unique key currently(exec node has null check and inner
// state only support data with keys), so only pass the left columns of required
// determinism to input.
return transmitDeterminismRequirement(
dedup, requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys())));
}
}

private StreamPhysicalRel visitWindowDeduplicate(
final StreamPhysicalWindowDeduplicate winDedup,
final ImmutableBitSet requireDeterminism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
// ignore required trait from context, because sink is the true root
sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]

case deduplicate: StreamPhysicalDeduplicate =>
// deduplicate only support insert only as input
val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) {
// only proctime first row deduplicate does not produce UPDATE changes
ModifyKindSetTrait.INSERT_ONLY
} else {
// other deduplicate produce update changes
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)

case agg: StreamPhysicalGroupAggregate =>
// agg support all changes in input
val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
Expand Down Expand Up @@ -490,7 +478,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate,
Expand Down

0 comments on commit 00625b0

Please sign in to comment.