Skip to content

Commit

Permalink
[fix][dingo-executor] Add value union rule and partition execute conc…
Browse files Browse the repository at this point in the history
…urrent level
  • Loading branch information
githubgxll authored and guojn1 committed Sep 26, 2024
1 parent 302955a commit 6c5f0b4
Show file tree
Hide file tree
Showing 26 changed files with 260 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public final class DingoRules {
DINGO_VALUES_REDUCE_RULE_PROJECT,
DINGO_VALUES_RULE,
LOGICAL_DINGO_VALUE_RULE,
DINGO_VALUES_UNION_RULE,
DINGO_FUNCTION_SCAN_RULE,
DINGO_VECTOR_INDEX_RULE,
DINGO_VECTOR_JOIN_RULE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;

@Value.Enclosing
public class DingoValuesUnionRule extends RelRule<DingoValuesUnionRule.Config> implements SubstitutionRule {
Expand All @@ -40,29 +39,25 @@ private DingoValuesUnionRule(Config config) {
@Override
public void onMatch(@NonNull RelOptRuleCall call) {
Union union = call.rel(0);
LogicalDingoValues value0 = call.rel(1);
LogicalDingoValues value1 = call.rel(2);
List<Object[]> tuples = new LinkedList<>(value0.getTuples());
tuples.addAll(value1.getTuples());
LogicalDingoValues values = new LogicalDingoValues(
union.getCluster(),
union.getTraitSet(),
union.getRowType(),
tuples
);
// TODO: bug if there are duplicate inputs.
List<RelNode> otherInputs = union.getInputs().stream()
.filter(n -> !((RelSubset) n).getRelList().contains(value0)
&& !((RelSubset) n).getRelList().contains(value1))
.collect(Collectors.toList());
if (otherInputs.isEmpty()) {
List<Object[]> tuples = new LinkedList<>();
for (RelNode input : union.getInputs()) {
if (input instanceof RelSubset) {
List<RelNode> relList = ((RelSubset) input).getRelList();
for (RelNode relNode : relList) {
if (relNode instanceof LogicalDingoValues) {
tuples.addAll(((LogicalDingoValues) relNode).getTuples());
}
}
}
}
if (tuples.size() == union.getInputs().size()) {
LogicalDingoValues values = new LogicalDingoValues(
union.getCluster(),
union.getTraitSet(),
union.getRowType(),
tuples
);
call.transformTo(values);
} else {
otherInputs.add(values);
call.transformTo(LogicalUnion.create(
otherInputs,
union.all
));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import io.dingodb.calcite.visitor.function.DingoValuesVisitFun;
import io.dingodb.calcite.visitor.function.DingoVectorStreamingVisitFun;
import io.dingodb.calcite.visitor.function.DingoVectorVisitFun;
import io.dingodb.common.ExecuteVariables;
import io.dingodb.common.Location;
import io.dingodb.common.log.LogUtils;
import io.dingodb.exec.base.IdGenerator;
Expand Down Expand Up @@ -115,26 +116,26 @@ public class DingoJobVisitor implements DingoRelVisitor<Collection<Vertex>> {
private boolean isScan;

@Getter
private boolean isJoinConcurrency;
private ExecuteVariables executeVariables;

private DingoJobVisitor(Job job, IdGenerator idGenerator, Location currentLocation,
ITransaction transaction, SqlKind kind, boolean isJoinConcurrency) {
ITransaction transaction, SqlKind kind, ExecuteVariables executeVariables) {
this.job = job;
this.idGenerator = idGenerator;
this.currentLocation = currentLocation;
this.transaction = transaction;
this.kind = kind;
this.isJoinConcurrency = isJoinConcurrency;
this.executeVariables = executeVariables;
}

public static void renderJob(Job job, RelNode input, Location currentLocation) {
renderJob(job, input, currentLocation, false, null, null, false);
renderJob(job, input, currentLocation, false, null, null, new ExecuteVariables());
}

public static void renderJob(Job job, RelNode input, Location currentLocation,
boolean checkRoot, ITransaction transaction, SqlKind kind, boolean isJoinConcurrency) {
boolean checkRoot, ITransaction transaction, SqlKind kind, ExecuteVariables executeVariables) {
IdGenerator idGenerator = new IdGeneratorImpl(job.getJobId().seq);
DingoJobVisitor visitor = new DingoJobVisitor(job, idGenerator, currentLocation, transaction, kind, isJoinConcurrency);
DingoJobVisitor visitor = new DingoJobVisitor(job, idGenerator, currentLocation, transaction, kind, executeVariables);
Collection<Vertex> outputs = dingo(input).accept(visitor);
if (checkRoot && !outputs.isEmpty()) {
throw new IllegalStateException("There root of plan must be `DingoRoot`.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public static Collection<Vertex> visit(
null,
false,
false,
null);
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION, distributionParam);
Task task = job.getOrCreate(currentLocation, idGenerator);
calcVertex.setId(idGenerator.getOperatorId(task.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ public static Collection<Vertex> visit(
null,
false,
false,
keyTuple));
keyTuple,
visitor.getExecuteVariables().getConcurrencyLevel()
));

Task task;
if (transaction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public static LinkedList<Vertex> visit(
null,
false,
false,
keyTuple));
keyTuple,
visitor.getExecuteVariables().getConcurrencyLevel()
));

Task task;
if (transaction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private DingoIndexFullScanVisitFun() {
null,
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
false,
null);
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION, distributionParam);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ private DingoIndexRangeScanVisitFun() {
filter,
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
false,
null);
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION_1, distributionParam);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private DingoIndexScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false),
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false, visitor),
null,
null,
scanVertexCreator
Expand All @@ -108,7 +108,7 @@ private DingoIndexScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcDistributionVertex(rel, start, end, false),
(start, end) -> createCalcDistributionVertex(rel, start, end, false, visitor),
null,
null,
scanVertexCreator
Expand All @@ -118,7 +118,7 @@ private DingoIndexScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false),
(start, end) -> createCalcRangeDistributionVertex(rel, start, end, false, visitor),
null,
null,
scanVertexCreator
Expand All @@ -132,7 +132,7 @@ private DingoIndexScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcDistributionVertex(rel, start, end, false),
(start, end) -> createCalcDistributionVertex(rel, start, end, false, visitor),
partition.getStart(),
i < partitionNum - 1 ? partitions.get(i + 1).getStart() : null,
scanVertexCreator
Expand Down Expand Up @@ -269,7 +269,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
@NonNull DingoIndexScanWithRelOp rel,
byte[] startKey,
byte[] endKey,
boolean withEnd
boolean withEnd,
DingoJobVisitor visitor
) {
MetaService metaService = MetaService.root();
final IndexTable td = rel.getIndexTable();
Expand All @@ -292,7 +293,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
filter,
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
false,
null
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
distributionParam.setFilterRange(rel.isRangeScan());
Expand All @@ -303,7 +305,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
@NonNull DingoIndexScanWithRelOp rel,
byte[] startKey,
byte[] endKey,
boolean withEnd
boolean withEnd,
DingoJobVisitor visitor
) {
MetaService metaService = MetaService.root();
final IndexTable td = rel.getIndexTable();
Expand Down Expand Up @@ -334,7 +337,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
filter,
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
false,
null
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
distributionParam.setFilterRange(filterRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public static Collection<Vertex> visit(
null,
false,
false,
null);
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION, distributionSourceParam);

for (RangeDistribution distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public static Collection<Vertex> visit(
null,
false,
rel.isNotBetween(),
null);
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION, distributionParam);
Task task;
if (transaction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private DingoScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcRangeDistributionVertex(rel, tableInfo, start, end, false),
(start, end) -> createCalcRangeDistributionVertex(rel, tableInfo, start, end, false, visitor),
null,
null,
scanVertexCreator
Expand All @@ -113,7 +113,7 @@ private DingoScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcDistributionVertex(rel, tableInfo, start, end, false),
(start, end) -> createCalcDistributionVertex(rel, tableInfo, start, end, false, visitor),
null,
null,
scanVertexCreator
Expand Down Expand Up @@ -144,7 +144,7 @@ private DingoScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcRangeDistributionVertex(rel, tableInfo, start, end, false),
(start, end) -> createCalcRangeDistributionVertex(rel, tableInfo, start, end, false, visitor),
null,
null,
scanVertexCreator
Expand All @@ -157,7 +157,7 @@ private DingoScanWithRelOpVisitFun() {
outputs.add(createVerticesForRange(
task,
idGenerator,
(start, end) -> createCalcDistributionVertex(rel, tableInfo, start, end, false),
(start, end) -> createCalcDistributionVertex(rel, tableInfo, start, end, false, visitor),
partition.getStart(),
i < partitionNum - 1 ? partitions.get(i + 1).getStart() : null,
scanVertexCreator
Expand Down Expand Up @@ -298,7 +298,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
@NonNull NavigableMap<ComparableByteArray, RangeDistribution> ranges,
byte[] startKey,
byte[] endKey,
boolean withEnd
boolean withEnd,
DingoJobVisitor visitor
) {
final Table td = Objects.requireNonNull(rel.getTable().unwrap(DingoTable.class)).getTable();
// TODO: need to create range by filters.
Expand All @@ -312,7 +313,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
null,
false,
false,
null
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
return new Vertex(CALC_DISTRIBUTION_1, distributionParam);
}
Expand All @@ -322,7 +324,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
@NonNull TableInfo tableInfo,
byte[] startKey,
byte[] endKey,
boolean withEnd
boolean withEnd,
DingoJobVisitor visitor
) {
final Table td = Objects.requireNonNull(rel.getTable().unwrap(DingoTable.class)).getTable();
NavigableMap<ComparableByteArray, RangeDistribution> ranges = tableInfo.getRangeDistributions();
Expand All @@ -342,7 +345,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
filter,
false,
false,
null
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
return new Vertex(CALC_DISTRIBUTION_1, distributionParam);
}
Expand All @@ -352,7 +356,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
@NonNull TableInfo tableInfo,
byte[] startKey,
byte[] endKey,
boolean withEnd
boolean withEnd,
DingoJobVisitor visitor
) {
final Table td = Objects.requireNonNull(rel.getTable().unwrap(DingoTable.class)).getTable();
NavigableMap<ComparableByteArray, RangeDistribution> ranges = tableInfo.getRangeDistributions();
Expand Down Expand Up @@ -380,7 +385,8 @@ private static long getScanTs(@NonNull ITransaction transaction, SqlKind kind) {
filter,
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
false,
null
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
distributionParam.setKeepOrder(rel.getKeepSerialOrder());
distributionParam.setFilterRange(rel.isRangeScan());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public static Collection<Vertex> visit(
) {
List<Vertex> outputs = new LinkedList<>();
List<Location> locations = new ArrayList<>();
if (visitor.isJoinConcurrency()) {
if (visitor.getExecuteVariables().isJoinConcurrency()) {
locations.addAll(ClusterService.getDefault().getComputingLocations());
}
final HashStrategy hs = new SimpleHashStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ private DingoTableScanVisitFun() {
filter,
Optional.mapOrGet(rel.getFilter(), __ -> __.getKind() == SqlKind.NOT, () -> false),
false,
null);
null,
visitor.getExecuteVariables().getConcurrencyLevel()
);
Vertex calcVertex = new Vertex(CALC_DISTRIBUTION, distributionParam);
Task task;
if (transaction != null) {
Expand Down
Loading

0 comments on commit 6c5f0b4

Please sign in to comment.