Skip to content

Commit

Permalink
[FLINK-34118] Implement restore tests for Sort node
Browse files Browse the repository at this point in the history
  • Loading branch information
bvarghese1 authored and dawidwys committed Feb 23, 2024
1 parent 6c8f3a0 commit fe3d9a4
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

import java.util.Arrays;
import java.util.List;

/** Restore tests for {@link StreamExecSort}. */
public class SortRestoreTest extends RestoreTestBase {

public SortRestoreTest() {
super(StreamExecSort.class, AfterRestoreSource.NO_RESTORE);
}

@Override
public List<TableTestProgram> programs() {
return Arrays.asList(SortTestPrograms.SORT_ASC, SortTestPrograms.SORT_DESC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.planner.utils.InternalConfigOptions;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;

/**
* {@link TableTestProgram} definitions for testing {@link
* org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit}.
* org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit} and {@link
* org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort}.
*/
public class SortTestPrograms {

Expand Down Expand Up @@ -123,4 +125,48 @@ public class SortTestPrograms {
.build())
.runSql("INSERT INTO sink_t SELECT * from source_t ORDER BY a DESC LIMIT 3")
.build();

static final TableTestProgram SORT_ASC =
TableTestProgram.of("sort-asc", "validates sort node by sorting integers in asc mode")
.setupConfig(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b VARCHAR", "c INT")
.producedValues(DATA)
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT", "b VARCHAR", "c BIGINT")
.consumedValues(
"+I[1, a, 5]",
"+I[2, a, 6]",
"+I[3, b, 7]",
"+I[4, b, 8]",
"+I[5, c, 9]",
"+I[6, c, 10]")
.build())
.runSql("INSERT INTO sink_t SELECT * from source_t ORDER BY a")
.build();

static final TableTestProgram SORT_DESC =
TableTestProgram.of("sort-desc", "validates sort node by sorting integers in desc mode")
.setupConfig(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b VARCHAR", "c INT")
.producedValues(DATA)
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT", "b VARCHAR", "c BIGINT")
.consumedValues(
"+I[6, c, 10]",
"+I[5, c, 9]",
"+I[4, b, 8]",
"+I[3, b, 7]",
"+I[2, a, 6]",
"+I[1, a, 5]")
.build())
.runSql("INSERT INTO sink_t SELECT * from source_t ORDER BY a DESC")
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.table.test.program.TableTestProgramRunner;
import org.apache.flink.table.test.program.TestStep.TestKind;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;

import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -63,6 +64,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -118,6 +120,7 @@ public EnumSet<TestKind> supportedSetupSteps() {
TestKind.FUNCTION,
TestKind.TEMPORAL_FUNCTION,
TestKind.SOURCE_WITH_RESTORE_DATA,
TestKind.SOURCE_WITH_DATA,
TestKind.SINK_WITH_RESTORE_DATA,
TestKind.SINK_WITH_DATA);
}
Expand Down Expand Up @@ -264,7 +267,11 @@ void testRestore(TableTestProgram program, ExecNodeMetadata metadata) throws Exc
program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));

for (SourceTestStep sourceTestStep : program.getSetupSourceTestSteps()) {
final String id = TestValuesTableFactory.registerData(sourceTestStep.dataAfterRestore);
final Collection<Row> data =
afterRestoreSource == AfterRestoreSource.NO_RESTORE
? sourceTestStep.dataBeforeRestore
: sourceTestStep.dataAfterRestore;
final String id = TestValuesTableFactory.registerData(data);
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("data-id", id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{
"flinkVersion" : "1.19",
"nodes" : [ {
"id" : 1,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`source_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
"dataType" : "INT"
}, {
"name" : "b",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "c",
"dataType" : "INT"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ]
}
}
},
"outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])",
"inputProperties" : [ ]
}, {
"id" : 2,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "SINGLETON"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
"description" : "Exchange(distribution=[single])"
}, {
"id" : 3,
"type" : "stream-exec-sort_1",
"orderBy" : {
"fields" : [ {
"index" : 0,
"isAscending" : true,
"nullIsLast" : false
} ]
},
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
"description" : "Sort(orderBy=[a ASC])"
}, {
"id" : 4,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "INT"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "CALL",
"syntax" : "SPECIAL",
"internalName" : "$CAST$1",
"operands" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 2,
"type" : "INT"
} ],
"type" : "BIGINT"
} ],
"condition" : null,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
"description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
}, {
"id" : 5,
"type" : "stream-exec-sink_1",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.rowtime-inserter" : "ENABLED",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`sink_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
"dataType" : "INT"
}, {
"name" : "b",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "c",
"dataType" : "BIGINT"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ]
}
}
},
"inputChangelogMode" : [ "INSERT" ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
"description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])"
} ],
"edges" : [ {
"source" : 1,
"target" : 2,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 2,
"target" : 3,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 3,
"target" : 4,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 4,
"target" : 5,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}
Loading

0 comments on commit fe3d9a4

Please sign in to comment.