Skip to content

Commit

Permalink
[FLINK-34248] Implement restore tests for changelog normalize node
Browse files Browse the repository at this point in the history
  • Loading branch information
bvarghese1 authored and dawidwys committed Feb 15, 2024
1 parent 2298e53 commit e76ccdc
Show file tree
Hide file tree
Showing 8 changed files with 677 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

import java.util.Arrays;
import java.util.List;

/** Restore tests for {@link StreamExecChangelogNormalize}. */
public class ChangelogNormalizeRestoreTest extends RestoreTestBase {

public ChangelogNormalizeRestoreTest() {
super(StreamExecChangelogNormalize.class);
}

@Override
public List<TableTestProgram> programs() {
return Arrays.asList(
ChangelogNormalizeTestPrograms.CHANGELOG_SOURCE,
ChangelogNormalizeTestPrograms.CHANGELOG_SOURCE_MINI_BATCH,
ChangelogNormalizeTestPrograms.UPSERT_SOURCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.time.Duration;

/** {@link TableTestProgram} definitions for testing {@link StreamExecChangelogNormalize}. */
public class ChangelogNormalizeTestPrograms {

static final String[] SOURCE_SCHEMA = {
"a VARCHAR", "b INT NOT NULL", "c VARCHAR", "PRIMARY KEY(a) NOT ENFORCED"
};

static final String[] SINK_SCHEMA = {"a VARCHAR", "b INT", "c VARCHAR"};

static final Row[] BEFORE_DATA = {
Row.ofKind(RowKind.INSERT, "one", 1, "a"),
Row.ofKind(RowKind.INSERT, "two", 2, "b"),
Row.ofKind(RowKind.UPDATE_BEFORE, "one", 1, "a"),
Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "aa"),
Row.ofKind(RowKind.INSERT, "three", 3, "c"),
Row.ofKind(RowKind.DELETE, "two", 2, "b"),
Row.ofKind(RowKind.UPDATE_BEFORE, "three", 3, "c"),
Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "cc"),
};

static final Row[] AFTER_DATA = {
Row.ofKind(RowKind.INSERT, "four", 4, "d"),
Row.ofKind(RowKind.INSERT, "five", 5, "e"),
Row.ofKind(RowKind.UPDATE_BEFORE, "four", 4, "d"),
Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "dd"),
Row.ofKind(RowKind.INSERT, "six", 6, "f"),
Row.ofKind(RowKind.DELETE, "six", 6, "f")
};

static final String[] BEFORE_OUTPUT = {
"+I[one, 1, a]",
"+I[two, 2, b]",
"-U[one, 1, a]",
"+U[one, 1, aa]",
"+I[three, 3, c]",
"-D[two, 2, b]",
"-U[three, 3, c]",
"+U[three, 3, cc]"
};

static final String[] AFTER_OUTPUT = {
"+I[four, 4, d]",
"+I[five, 5, e]",
"-U[four, 4, d]",
"+U[four, 4, dd]",
"+I[six, 6, f]",
"-D[six, 6, f]"
};

static final TableTestProgram CHANGELOG_SOURCE =
TableTestProgram.of(
"changelog-normalize-source", "validates changelog normalize source")
.setupConfig(
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addOption("changelog-mode", "I,UA,UB,D")
.addSchema(SOURCE_SCHEMA)
.producedBeforeRestore(BEFORE_DATA)
.producedAfterRestore(AFTER_DATA)
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(SINK_SCHEMA)
.consumedBeforeRestore(BEFORE_OUTPUT)
.consumedAfterRestore(AFTER_OUTPUT)
.build())
.runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
.build();

static final TableTestProgram CHANGELOG_SOURCE_MINI_BATCH =
TableTestProgram.of(
"changelog-normalize-source-mini-batch",
"validates changelog normalize source with mini batch")
.setupConfig(
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
.setupConfig(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(10))
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 2L)
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addOption("changelog-mode", "I,UA,UB,D")
.addSchema(SOURCE_SCHEMA)
.producedBeforeRestore(BEFORE_DATA)
.producedAfterRestore(AFTER_DATA)
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(SINK_SCHEMA)
.consumedBeforeRestore(BEFORE_OUTPUT)
.consumedAfterRestore(AFTER_OUTPUT)
.build())
.runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
.build();

static final TableTestProgram UPSERT_SOURCE =
TableTestProgram.of(
"changelog-normalize-upsert", "validates changelog normalize upsert")
.setupConfig(
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addOption("changelog-mode", "I,UA,D")
.addSchema(SOURCE_SCHEMA)
.producedBeforeRestore(
Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "a"),
Row.ofKind(RowKind.UPDATE_AFTER, "two", 2, "b"),
Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "aa"),
Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "c"),
Row.ofKind(RowKind.DELETE, "two", 2, "b"),
Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "cc"))
.producedAfterRestore(
Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "d"),
Row.ofKind(RowKind.UPDATE_AFTER, "five", 5, "e"),
Row.ofKind(RowKind.UPDATE_AFTER, "six", 6, "f"),
Row.ofKind(RowKind.UPDATE_AFTER, "five", 5, "ee"),
Row.ofKind(RowKind.DELETE, "six", 6, "f"),
Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "dd"))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(SINK_SCHEMA)
.consumedBeforeRestore(BEFORE_OUTPUT)
.consumedAfterRestore(
"+I[four, 4, d]",
"+I[five, 5, e]",
"+I[six, 6, f]",
"-U[five, 5, e]",
"+U[five, 5, ee]",
"-D[six, 6, f]",
"-U[four, 4, d]",
"+U[four, 4, dd]")
.build())
.runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
.build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
{
"flinkVersion" : "1.19",
"nodes" : [ {
"id" : 6,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`source_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
"dataType" : "VARCHAR(2147483647) NOT NULL"
}, {
"name" : "b",
"dataType" : "INT NOT NULL"
}, {
"name" : "c",
"dataType" : "VARCHAR(2147483647)"
} ],
"watermarkSpecs" : [ ],
"primaryKey" : {
"name" : "PK_a",
"type" : "PRIMARY_KEY",
"columns" : [ "a" ]
}
},
"partitionKeys" : [ ]
}
}
},
"outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])",
"inputProperties" : [ ]
}, {
"id" : 7,
"type" : "stream-exec-mini-batch-assigner_1",
"miniBatchInterval" : {
"interval" : 10000,
"mode" : "ProcTime"
},
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
"description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
}, {
"id" : 8,
"type" : "stream-exec-drop-update-before_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
"description" : "DropUpdateBefore"
}, {
"id" : 9,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
"description" : "Exchange(distribution=[hash[a]])"
}, {
"id" : 10,
"type" : "stream-exec-changelog-normalize_1",
"configuration" : {
"table.exec.mini-batch.enabled" : "true",
"table.exec.mini-batch.size" : "2"
},
"uniqueKeys" : [ 0 ],
"generateUpdateBefore" : true,
"state" : [ {
"index" : 0,
"ttl" : "0 ms",
"name" : "changelogNormalizeState"
} ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
"description" : "ChangelogNormalize(key=[a])"
}, {
"id" : 11,
"type" : "stream-exec-sink_1",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.rowtime-inserter" : "ENABLED",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`sink_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "b",
"dataType" : "INT"
}, {
"name" : "c",
"dataType" : "VARCHAR(2147483647)"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ]
}
}
},
"inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ],
"inputUpsertKey" : [ 0 ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])"
} ],
"edges" : [ {
"source" : 6,
"target" : 7,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 7,
"target" : 8,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 8,
"target" : 9,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 9,
"target" : 10,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 10,
"target" : 11,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}
Binary file not shown.
Loading

0 comments on commit e76ccdc

Please sign in to comment.