Skip to content

Commit

Permalink
[FLINK-36095][table-planner] KeyedLookupJoinWrapper should shuffle by…
Browse files Browse the repository at this point in the history
… input upsertKey instead of join key to avoid changelog disordering
  • Loading branch information
lincoln-lil committed Aug 21, 2024
1 parent f57ebac commit 624bc50
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
Expand All @@ -62,10 +61,11 @@

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.PARTITIONER_TRANSFORMATION;

Expand All @@ -84,6 +84,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin
"lookupKeyContainsPrimaryKey";

public static final String STATE_NAME = "lookupJoinState";
public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey";

@JsonProperty(FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY)
private final boolean lookupKeyContainsPrimaryKey;
Expand All @@ -97,6 +98,11 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<StateMetadata> stateMetadataList;

@Nullable
@JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY)
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
private final int[] inputUpsertKey;

public StreamExecLookupJoin(
ReadableConfig tableConfig,
FlinkJoinType joinType,
Expand All @@ -111,6 +117,7 @@ public StreamExecLookupJoin(
@Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
@Nullable LookupJoinUtil.RetryLookupOptions retryOptions,
ChangelogMode inputChangelogMode,
@Nullable int[] inputUpsertKey,
InputProperty inputProperty,
RowType outputType,
String description) {
Expand All @@ -130,6 +137,7 @@ public StreamExecLookupJoin(
asyncLookupOptions,
retryOptions,
inputChangelogMode,
inputUpsertKey,
// serialize state meta only when upsert materialize is enabled
upsertMaterialize
? StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME)
Expand Down Expand Up @@ -164,6 +172,7 @@ public StreamExecLookupJoin(
LookupJoinUtil.RetryLookupOptions retryOptions,
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) @Nullable
ChangelogMode inputChangelogMode,
@JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) @Nullable int[] inputUpsertKey,
@JsonProperty(FIELD_NAME_STATE) @Nullable List<StateMetadata> stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
Expand All @@ -187,11 +196,11 @@ public StreamExecLookupJoin(
description);
this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
this.upsertMaterialize = upsertMaterialize;
this.inputUpsertKey = inputUpsertKey;
this.stateMetadataList = stateMetadataList;
}

@Override
@SuppressWarnings("unchecked")
public Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
return createJoinTransformation(
Expand Down Expand Up @@ -246,39 +255,28 @@ protected Transformation<RowData> createSyncLookupJoinWithState(
KeyedProcessOperator<RowData, RowData, RowData> operator =
new KeyedProcessOperator<>(keyedLookupJoinWrapper);

List<Integer> refKeys =
allLookupKeys.values().stream()
.filter(key -> key instanceof LookupJoinUtil.FieldRefLookupKey)
.map(key -> ((LookupJoinUtil.FieldRefLookupKey) key).index)
.collect(Collectors.toList());
RowDataKeySelector keySelector;

// use single parallelism for empty key shuffle
boolean singleParallelism = refKeys.isEmpty();
if (singleParallelism) {
// all lookup keys are constants, then use an empty key selector
keySelector = EmptyRowDataKeySelector.INSTANCE;
int[] shuffleKeys;
if (inputUpsertKey == null || inputUpsertKey.length == 0) {
// input has no upsertKeys, then use all columns for key selector
shuffleKeys = IntStream.range(0, inputRowType.getFieldCount()).toArray();
} else {
shuffleKeys = inputUpsertKey;
// make it a deterministic asc order
Collections.sort(refKeys);
keySelector =
KeySelectorUtil.getRowDataSelector(
classLoader,
refKeys.stream().mapToInt(Integer::intValue).toArray(),
InternalTypeInfo.of(inputRowType));
Arrays.sort(shuffleKeys);
}

RowDataKeySelector keySelector;
keySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, shuffleKeys, InternalTypeInfo.of(inputRowType));
final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
new KeyGroupStreamPartitioner<>(
keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
Transformation<RowData> partitionedTransform =
new PartitionTransformation<>(inputTransformation, partitioner);
createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", "Partitioner", config)
.fill(partitionedTransform);
if (singleParallelism) {
setSingletonParallelism(partitionedTransform);
} else {
partitionedTransform.setParallelism(inputTransformation.getParallelism(), false);
}
partitionedTransform.setParallelism(inputTransformation.getParallelism(), false);

OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
Expand All @@ -290,14 +288,6 @@ protected Transformation<RowData> createSyncLookupJoinWithState(
false);
transform.setStateKeySelector(keySelector);
transform.setStateKeyType(keySelector.getProducedType());
if (singleParallelism) {
setSingletonParallelism(transform);
}
return transform;
}

private void setSingletonParallelism(Transformation<RowData> transformation) {
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -42,8 +43,17 @@ public class UpsertKeyUtil {
*/
@Nonnull
public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) {
return smallestKey(upsertKeys).orElse(new int[0]);
}

/**
* Returns the smallest key of given upsert keys wrapped with a java {@link Optional}. Different
* from {@link #getSmallestKey(Set)}, it'll return result with an empty {@link Optional} if the
* input set is null or empty.
*/
public static Optional<int[]> smallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) {
if (null == upsertKeys || upsertKeys.isEmpty()) {
return new int[0];
return Optional.empty();
}
return upsertKeys.stream()
.map(ImmutableBitSet::toArray)
Expand All @@ -60,7 +70,6 @@ public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) {
}
}
return k2;
})
.get();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream

import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil}
import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil, UpsertKeyUtil}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil

import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexProgram

import java.util
import java.util.Optional

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -111,8 +113,25 @@ class StreamPhysicalLookupJoin(
asyncOptions.orNull,
retryOptions.orNull,
inputChangelogMode,
getUpsertKey.orElse(null),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val upsertKey = getUpsertKey
super
.explainTerms(pw)
.itemIf("upsertKey", util.Arrays.toString(upsertKey.orElse(null)), upsertKey.isPresent)
}

private def getUpsertKey: Optional[Array[Int]] = {
// no need to call getUpsertKeysInKeyGroupRange here because there's no exchange before lookup
// join, and only add exchange inside the xxExecLookupJoin node.
val inputUpsertKeys = FlinkRelMetadataQuery
.reuseOrCreate(cluster.getMetadataQuery)
.getUpsertKeys(inputRel)
UpsertKeyUtil.smallestKey(inputUpsertKeys)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public List<TableTestProgram> programs() {
LookupJoinTestPrograms.LOOKUP_JOIN_POST_FILTER,
LookupJoinTestPrograms.LOOKUP_JOIN_PRE_POST_FILTER,
LookupJoinTestPrograms.LOOKUP_JOIN_ASYNC_HINT,
LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT);
LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT,
LookupJoinTestPrograms.LOOKUP_JOIN_WITH_TRY_RESOLVE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -104,6 +106,31 @@ public class LookupJoinTestPrograms {
)
.build();

static final SourceTestStep ORDERS_CDC =
SourceTestStep.newBuilder("orders_cdc_t")
.addOption("filterable-fields", "customer_id")
.addOption("changelog-mode", "I,UA,UB,D")
.addSchema(
"order_id INT",
"customer_id INT",
"total DOUBLE",
"order_time STRING",
"proc_time AS PROCTIME()")
.addSchema("PRIMARY KEY (order_id) NOT ENFORCED")
.producedBeforeRestore(
Row.of(1, 3, 44.44, "2020-10-10 00:00:01"),
Row.of(2, 5, 100.02, "2020-10-10 00:00:02"),
Row.of(4, 2, 92.61, "2020-10-10 00:00:04"),
Row.of(3, 1, 23.89, "2020-10-10 00:00:03"),
Row.of(6, 4, 7.65, "2020-10-10 00:00:06"),
Row.of(5, 2, 12.78, "2020-10-10 00:00:05"))
.producedAfterRestore(
Row.ofKind(RowKind.DELETE, 3, 1, 23.89, "2020-10-10 00:00:03"),
Row.ofKind(RowKind.INSERT, 3, 1, 33.01, "2020-10-10 01:01:06"),
Row.ofKind(RowKind.DELETE, 4, 2, 92.61, "2020-10-10 02:00:04"),
Row.ofKind(RowKind.INSERT, 7, 6, 17.58, "2020-10-10 03:20:07"))
.build();

static final List<String> SINK_SCHEMA =
Arrays.asList(
"order_id INT",
Expand Down Expand Up @@ -385,4 +412,48 @@ public class LookupJoinTestPrograms {
+ "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C "
+ "ON O.customer_id = C.id")
.build();

static final TableTestProgram LOOKUP_JOIN_WITH_TRY_RESOLVE =
TableTestProgram.of(
"lookup-join-with-try-resolve",
"validates lookup join with NUD try resolve strategy enabled")
.setupConfig(
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
.setupTableSource(CUSTOMERS)
.setupTableSource(ORDERS_CDC)
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(
SINK_SCHEMA.stream()
.filter(field -> !field.equals("age INT"))
.collect(Collectors.toList()))
// sink's new pk requires determinism on column city
.addSchema("PRIMARY KEY (order_id, city) NOT ENFORCED")
.consumedBeforeRestore(
"+I[1, 44.44, 3, Claire, Austin, Texas, 73301]",
"+I[2, 100.02, 5, Jake, New York City, New York, 10001]",
"+I[4, 92.61, 2, Alice, San Francisco, California, 95016]",
"+I[3, 23.89, 1, Bob, Mountain View, California, 94043]",
"+I[6, 7.65, 4, Shannon, Boise, Idaho, 83701]",
"+I[5, 12.78, 2, Alice, San Francisco, California, 95016]")
.consumedAfterRestore(
"-D[3, 23.89, 1, Bob, Mountain View, California, 94043]",
"+I[3, 33.01, 1, Bob, San Jose, California, 94089]",
"-D[4, 92.61, 2, Alice, San Francisco, California, 95016]",
"+I[7, 17.58, 6, Joana, Atlanta, Georgia, 30033]")
.build())
.runSql(
"INSERT INTO sink_t SELECT "
+ "O.order_id, "
+ "O.total, "
+ "C.id, "
+ "C.name, "
+ "C.city, "
+ "C.state, "
+ "C.zipcode "
+ "FROM orders_cdc_t as O "
+ "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C "
+ "ON O.customer_id = C.id")
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "LookupJoin[6]",
"pact" : "Operator",
"contents" : "[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, age], upsertMaterialize=[true])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : 8,
"ship_strategy" : "HASH",
Expand All @@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "Sink: Sink1[7]",
"pact" : "Data Sink",
"contents" : "[7]:Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : 10,
"ship_strategy" : "FORWARD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "LookupJoin[]",
"pact" : "Operator",
"contents" : "[]:LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, age], upsertMaterialize=[true])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "HASH",
Expand All @@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "Sink: Sink1[]",
"pact" : "Data Sink",
"contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ on t1.a = t2.a and ndFunc(t2.b) > 100]]>
<![CDATA[
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c])
+- Calc(select=[a, b, c])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], upsertKey=[[0]])
+- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c])
advice[1]: [WARNING] There exists non deterministic function: 'ndFunc' in condition: '>(ndFunc($1), 100)' which may cause wrong result in update pipeline.
related rel plan:
LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], changelogMode=[I,UB,UA,D])
LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], upsertKey=[[0]], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])
Expand Down Expand Up @@ -72,7 +72,7 @@ No available advice...
<![CDATA[
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c])
+- Calc(select=[a, b, c])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertKey=[[0]])
+- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c], metadata=[]]], fields=[a, c])
advice[1]: [WARNING] You might want to enable upsert materialization for look up join operator by configuring ('table.optimizer.non-deterministic-update.strategy' to 'TRY_RESOLVE') to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline.
Expand Down
Loading

0 comments on commit 624bc50

Please sign in to comment.