Skip to content

Commit

Permalink
IGNITE-15550 SQL Calcite: implement ARRAY, ARRAY_AGG, ARRAY_CONCAT_AG…
Browse files Browse the repository at this point in the history
…G and MAP support (apache#10059)
  • Loading branch information
ivandasch authored Jun 7, 2022
1 parent 5c9a8c7 commit a6067c6
Show file tree
Hide file tree
Showing 23 changed files with 1,528 additions and 834 deletions.
2 changes: 1 addition & 1 deletion docs/_docs/SQL/sql-calcite.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ The Calcite-based SQL engine currently supports:
|Group | Functions list

|Aggregate functions
|`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, `ANY_VALUE`, `LISTAGG`, `GROUP_CONCAT`, `STRING_AGG`
|`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, `ANY_VALUE`, `LISTAGG`, `GROUP_CONCAT`, `STRING_AGG`, `ARRAY_AGG`, `ARRAY_CONCAT_AGG`

|String functions
|`UPPER`, `LOWER`, `INITCAP`, `TO_BASE64`, `FROM_BASE64`, `MD5`, `SHA1`, `SUBSTRING`, `LEFT`, `RIGHT`, `REPLACE`, `TRANSLATE`, `CHR`, `CHAR_LENGTH`, `CHARACTER_LENGTH`, `LENGTH`, `CONCAT`, `OVERLAY`, `POSITION`, `ASCII`, `REPEAT`, `SPACE`, `STRCMP`, `SOUNDEX`, `DIFFERENCE`, `REVERSE`, `TRIM`, `LTRIM`, `RTRIM`, `REGEXP_REPLACE`
Expand Down
18 changes: 9 additions & 9 deletions modules/calcite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
Expand Down Expand Up @@ -278,7 +286,7 @@
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>compile</phase>
<phase>validate</phase>
<goals>
<goal>generate</goal>
</goals>
Expand Down Expand Up @@ -307,14 +315,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
3 changes: 2 additions & 1 deletion modules/calcite/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ data: {
# "ANY"
"ARE"
"ARRAY"
# "ARRAY_AGG" # not a keyword in Calcite
"ARRAY_AGG"
"ARRAY_CONCAT_AGG"
"ARRAY_MAX_CARDINALITY"
"AS"
"ASC"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractSetOpNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.CollectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
Expand All @@ -63,6 +64,7 @@
import org.apache.ignite.internal.processors.query.calcite.exec.rel.UnionAllNode;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCollect;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
Expand Down Expand Up @@ -757,6 +759,19 @@ else if (rel instanceof Intersect)
return node;
}

/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteCollect rel) {
RelDataType outType = rel.getRowType();

CollectNode<Row> node = new CollectNode<>(ctx, outType);

Node<Row> input = visit(rel.getInput());

node.register(input);

return node;
}

/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteRel rel) {
return rel.accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -85,6 +85,8 @@ private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
case "ANY_VALUE":
return () -> new AnyVal<>(call, hnd);
case "LISTAGG":
case "ARRAY_AGG":
case "ARRAY_CONCAT_AGG":
return listAggregateSupplier(call, ctx);
default:
throw new AssertionError(call.getAggregation().getName());
Expand All @@ -98,7 +100,16 @@ private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
) {
RowHandler<Row> hnd = ctx.rowHandler();

Supplier<Accumulator<Row>> accSup = () -> new ListAggAccumulator<>(call, hnd);
Supplier<Accumulator<Row>> accSup;
String aggName = call.getAggregation().getName();
if ("LISTAGG".equals(aggName))
accSup = () -> new ListAggAccumulator<>(call, hnd);
else if ("ARRAY_CONCAT_AGG".equals(aggName))
accSup = () -> new ArrayConcatAggregateAccumulator<>(call, hnd);
else if ("ARRAY_AGG".equals(aggName))
accSup = () -> new ArrayAggregateAccumulator<>(call, hnd);
else
throw new AssertionError(call.getAggregation().getName());

if (call.getCollation() != null && !call.getCollation().getFieldCollations().isEmpty()) {
Comparator<Row> cmp = ctx.expressionFactory().comparator(call.getCollation());
Expand Down Expand Up @@ -1082,55 +1093,85 @@ private SortingAccumulator(Supplier<Accumulator<Row>> accSup, Comparator<Row> cm
}

/** */
private static class ListAggAccumulator<Row> extends AbstractAccumulator<Row> {
/** Default separator. */
private static final String DEFAULT_SEPARATOR = ",";

/** */
private final List<Row> list;

private abstract static class AggAccumulator<Row> extends AbstractAccumulator<Row> implements Iterable<Row> {
/** */
private final boolean isDfltSep;
private final List<Row> buf;

/** */
public ListAggAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
protected AggAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
super(aggCall, hnd);

isDfltSep = aggCall.getArgList().size() <= 1;

list = new ArrayList<>();
buf = new ArrayList<>();
}

/** {@inheritDoc} */
@Override public void add(Row row) {
if (row == null || get(0, row) == null)
if (row == null)
return;

list.add(row);
buf.add(row);
}

/** {@inheritDoc} */
@Override public void apply(Accumulator<Row> other) {
ListAggAccumulator<Row> other0 = (ListAggAccumulator<Row>)other;
AggAccumulator<Row> other0 = (AggAccumulator<Row>)other;

list.addAll(other0.list);
buf.addAll(other0.buf);
}

/** {@inheritDoc} */
@Override public Iterator<Row> iterator() {
return buf.iterator();
}

/** */
public boolean isEmpty() {
return buf.isEmpty();
}

/** */
public int size() {
return buf.size();
}
}

/** */
private static class ListAggAccumulator<Row> extends AggAccumulator<Row> {
/** Default separator. */
private static final String DEFAULT_SEPARATOR = ",";

/** */
private final boolean isDfltSep;

/** */
public ListAggAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
super(aggCall, hnd);

isDfltSep = aggCall.getArgList().size() <= 1;
}

/** {@inheritDoc} */
@Override public Object end() {
if (list.isEmpty())
if (isEmpty())
return null;

StringBuilder builder = new StringBuilder();
StringBuilder builder = null;

for (Row row: this) {
Object val = get(0, row);

if (val == null)
continue;

if (builder == null)
builder = new StringBuilder();

for (Row row: list) {
if (builder.length() != 0)
builder.append(extractSeparator(row));

builder.append(Objects.toString(get(0, row)));
builder.append(val);
}

return builder.toString();
return builder != null ? builder.toString() : null;
}

/** */
Expand Down Expand Up @@ -1158,6 +1199,79 @@ private String extractSeparator(Row row) {
}
}

/** */
private static class ArrayAggregateAccumulator<Row> extends AggAccumulator<Row> {
/** */
public ArrayAggregateAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
super(aggCall, hnd);
}

/** {@inheritDoc} */
@Override public Object end() {
if (size() == 0)
return null;

List<Object> result = new ArrayList<>(size());
for (Row row: this)
result.add(get(0, row));

return result;
}

/** {@inheritDoc} */
@Override public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
return F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(ANY), true));
}

/** {@inheritDoc} */
@Override public RelDataType returnType(IgniteTypeFactory typeFactory) {
return typeFactory.createTypeWithNullability(typeFactory.createArrayType(
typeFactory.createSqlType(ANY), -1), true);
}
}

/** */
private static class ArrayConcatAggregateAccumulator<Row> extends AggAccumulator<Row> {
/** */
public ArrayConcatAggregateAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
super(aggCall, hnd);
}

/** {@inheritDoc} */
@Override public Object end() {
if (size() == 0)
return null;

List<Object> result = new ArrayList<>(size());

for (Row row: this) {
List<Object> arr = get(0, row);

if (F.isEmpty(arr))
continue;

result.addAll(arr);
}

if (result.isEmpty())
return null;

return result;
}

/** {@inheritDoc} */
@Override public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
return F.asList(typeFactory.createTypeWithNullability(typeFactory.createArrayType(
typeFactory.createSqlType(ANY), -1), true));
}

/** {@inheritDoc} */
@Override public RelDataType returnType(IgniteTypeFactory typeFactory) {
return typeFactory.createTypeWithNullability(typeFactory.createArrayType(
typeFactory.createSqlType(ANY), -1), true);
}
}

/** */
private static class DistinctAccumulator<Row> extends AbstractAccumulator<Row> {
/** */
Expand Down
Loading

0 comments on commit a6067c6

Please sign in to comment.