Skip to content

Commit

Permalink
[FLINK-18207][FLINK-18185][table] Fix datagen connector exactly-once …
Browse files Browse the repository at this point in the history
…bug and validation message


This closes apache#12544
  • Loading branch information
JingsongLi authored Jun 9, 2020
1 parent 8a7093b commit d1ed9bf
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testCheckpointRestore() throws Exception {
@Override
public void run() {
try {
source1.run(new BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
source1.run(new BlockingSourceContext<>("1", latchToTrigger1, latchToWait1, outputCollector, 21));
}
catch (Throwable t) {
t.printStackTrace();
Expand All @@ -93,7 +93,7 @@ public void run() {
@Override
public void run() {
try {
source2.run(new BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
source2.run(new BlockingSourceContext<>("2", latchToTrigger2, latchToWait2, outputCollector, 32));
}
catch (Throwable t) {
t.printStackTrace();
Expand Down Expand Up @@ -139,7 +139,7 @@ public void run() {
@Override
public void run() {
try {
source3.run(new BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
source3.run(new BlockingSourceContext<>("3", latchToTrigger3, latchToWait3, outputCollector, 3));
}
catch (Throwable t) {
t.printStackTrace();
Expand Down Expand Up @@ -186,22 +186,22 @@ public void run() {
/**
* Test SourceContext.
*/
public static class BlockingSourceContext implements SourceFunction.SourceContext<Long> {
public static class BlockingSourceContext<T> implements SourceFunction.SourceContext<T> {

private final String name;

private final Object lock;
private final OneShotLatch latchToTrigger;
private final OneShotLatch latchToWait;
private final ConcurrentHashMap<String, List<Long>> collector;
private final ConcurrentHashMap<String, List<T>> collector;

private final int threshold;
private int counter = 0;

private final List<Long> localOutput;
private final List<T> localOutput;

public BlockingSourceContext(String name, OneShotLatch latchToTrigger, OneShotLatch latchToWait,
ConcurrentHashMap<String, List<Long>> output, int elemToFire) {
ConcurrentHashMap<String, List<T>> output, int elemToFire) {
this.name = name;
this.lock = new Object();
this.latchToTrigger = latchToTrigger;
Expand All @@ -210,19 +210,19 @@ public BlockingSourceContext(String name, OneShotLatch latchToTrigger, OneShotLa
this.threshold = elemToFire;

this.localOutput = new ArrayList<>();
List<Long> prev = collector.put(name, localOutput);
List<T> prev = collector.put(name, localOutput);
if (prev != null) {
Assert.fail();
}
}

@Override
public void collectWithTimestamp(Long element, long timestamp) {
public void collectWithTimestamp(T element, long timestamp) {
collect(element);
}

@Override
public void collect(Long element) {
public void collect(T element) {
localOutput.add(element);
if (++counter == threshold) {
latchToTrigger.trigger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* Tests for {@link DataGeneratorSource}.
Expand Down Expand Up @@ -99,39 +100,44 @@ public void close() {
public void testSequenceCheckpointRestore() throws Exception {
final int initElement = 0;
final int maxElement = 100;
final int maxParallelsim = 2;

final Set<Long> expectedOutput = new HashSet<>();
for (long i = initElement; i <= maxElement; i++) {
expectedOutput.add(i);
}
DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(
() -> new DataGeneratorSource<>(
SequenceGenerator.longGenerator(initElement, maxElement)),
expectedOutput);
}

final ConcurrentHashMap<String, List<Long>> outputCollector = new ConcurrentHashMap<>();
public static <T> void innerTestDataGenCheckpointRestore(
Supplier<DataGeneratorSource<T>> supplier,
Set<T> expectedOutput) throws Exception {
final int maxParallelsim = 2;
final ConcurrentHashMap<String, List<T>> outputCollector = new ConcurrentHashMap<>();
final OneShotLatch latchToTrigger1 = new OneShotLatch();
final OneShotLatch latchToWait1 = new OneShotLatch();
final OneShotLatch latchToTrigger2 = new OneShotLatch();
final OneShotLatch latchToWait2 = new OneShotLatch();

final DataGeneratorSource<Long> source1 = new DataGeneratorSource<>(
SequenceGenerator.longGenerator(initElement, maxElement));
StreamSource<Long, DataGeneratorSource<Long>> src1 = new StreamSource<>(source1);
final DataGeneratorSource<T> source1 = supplier.get();
StreamSource<T, DataGeneratorSource<T>> src1 = new StreamSource<>(source1);

final AbstractStreamOperatorTestHarness<Long> testHarness1 =
final AbstractStreamOperatorTestHarness<T> testHarness1 =
new AbstractStreamOperatorTestHarness<>(src1, maxParallelsim, 2, 0);
testHarness1.open();

final DataGeneratorSource<Long> source2 = new DataGeneratorSource<>(
SequenceGenerator.longGenerator(initElement, maxElement));
StreamSource<Long, DataGeneratorSource<Long>> src2 = new StreamSource<>(source2);
final DataGeneratorSource<T> source2 = supplier.get();
StreamSource<T, DataGeneratorSource<T>> src2 = new StreamSource<>(source2);

final AbstractStreamOperatorTestHarness<Long> testHarness2 =
final AbstractStreamOperatorTestHarness<T> testHarness2 =
new AbstractStreamOperatorTestHarness<>(src2, maxParallelsim, 2, 1);
testHarness2.open();

// run the source asynchronously
Thread runner1 = new Thread(() -> {
try {
source1.run(new BlockingSourceContext(
source1.run(new BlockingSourceContext<>(
"1", latchToTrigger1, latchToWait1, outputCollector, 21));
} catch (Throwable t) {
t.printStackTrace();
Expand All @@ -141,7 +147,7 @@ public void testSequenceCheckpointRestore() throws Exception {
// run the source asynchronously
Thread runner2 = new Thread(() -> {
try {
source2.run(new BlockingSourceContext(
source2.run(new BlockingSourceContext<>(
"2", latchToTrigger2, latchToWait2, outputCollector, 32));
}
catch (Throwable t) {
Expand All @@ -165,15 +171,14 @@ public void testSequenceCheckpointRestore() throws Exception {
testHarness2.snapshot(0L, 0L)
);

final DataGeneratorSource<Long> source3 = new DataGeneratorSource<>(
SequenceGenerator.longGenerator(initElement, maxElement));
StreamSource<Long, DataGeneratorSource<Long>> src3 = new StreamSource<>(source3);
final DataGeneratorSource<T> source3 = supplier.get();
StreamSource<T, DataGeneratorSource<T>> src3 = new StreamSource<>(source3);

final OperatorSubtaskState initState =
AbstractStreamOperatorTestHarness.repartitionOperatorState(
snapshot, maxParallelsim, 2, 1, 0);

final AbstractStreamOperatorTestHarness<Long> testHarness3 =
final AbstractStreamOperatorTestHarness<T> testHarness3 =
new AbstractStreamOperatorTestHarness<>(src3, maxParallelsim, 1, 0);
testHarness3.setup();
testHarness3.initializeState(initState);
Expand All @@ -186,7 +191,7 @@ public void testSequenceCheckpointRestore() throws Exception {
// run the source asynchronously
Thread runner3 = new Thread(() -> {
try {
source3.run(new BlockingSourceContext(
source3.run(new BlockingSourceContext<>(
"3", latchToTrigger3, latchToWait3, outputCollector, 3));
}
catch (Throwable t) {
Expand All @@ -199,15 +204,15 @@ public void testSequenceCheckpointRestore() throws Exception {
Assert.assertEquals(3, outputCollector.size()); // we have 3 tasks.

// test for at-most-once
Set<Long> dedupRes = new HashSet<>(Math.abs(maxElement - initElement) + 1);
for (Map.Entry<String, List<Long>> elementsPerTask: outputCollector.entrySet()) {
Set<T> dedupRes = new HashSet<>(expectedOutput.size());
for (Map.Entry<String, List<T>> elementsPerTask: outputCollector.entrySet()) {
String key = elementsPerTask.getKey();
List<Long> elements = outputCollector.get(key);
List<T> elements = outputCollector.get(key);

// this tests the correctness of the latches in the test
Assert.assertTrue(elements.size() > 0);

for (Long elem : elements) {
for (T elem : elements) {
if (!dedupRes.add(elem)) {
Assert.fail("Duplicate entry: " + elem);
}
Expand All @@ -219,7 +224,7 @@ public void testSequenceCheckpointRestore() throws Exception {
}

// test for exactly-once
Assert.assertEquals(Math.abs(initElement - maxElement) + 1, dedupRes.size());
Assert.assertEquals(expectedOutput.size(), dedupRes.size());

latchToWait1.trigger();
latchToWait2.trigger();
Expand Down
7 changes: 7 additions & 0 deletions flink-table/flink-table-api-java-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ under the License.
<version>${project.version}</version>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
Expand Down Expand Up @@ -80,5 +82,10 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
Expand Down Expand Up @@ -173,13 +174,17 @@ public StringData next() {
}

private DataGenerator createSequenceGenerator(String name, DataType type, ReadableConfig options) {
OptionBuilder startKey = key(FIELDS + "." + name + "." + START);
OptionBuilder endKey = key(FIELDS + "." + name + "." + END);
String startKeyStr = FIELDS + "." + name + "." + START;
String endKeyStr = FIELDS + "." + name + "." + END;
OptionBuilder startKey = key(startKeyStr);
OptionBuilder endKey = key(endKeyStr);

options.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
() -> new ValidationException("Could not find required property '" + startKey + "'."));
() -> new ValidationException(
"Could not find required property '" + startKeyStr + "' for sequence generator."));
options.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
() -> new ValidationException("Could not find required property '" + endKey + "'."));
() -> new ValidationException(
"Could not find required property '" + endKeyStr + "' for sequence generator."));

switch (type.getLogicalType().getTypeRoot()) {
case CHAR:
Expand Down Expand Up @@ -291,6 +296,13 @@ public void open(
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
for (DataGenerator generator : fieldGenerators) {
generator.snapshotState(context);
}
}

@Override
public boolean hasNext() {
for (DataGenerator generator : fieldGenerators) {
Expand Down
Loading

0 comments on commit d1ed9bf

Please sign in to comment.