Skip to content

Commit

Permalink
[FLINK-28357][datastream] Disallow null elements in StreamNode#typeSe…
Browse files Browse the repository at this point in the history
…rializersIn

Otherwise tasks can not correctly determine number of inputs. This was causing an
issue where restored as finished OneInputStreamTask was waiting for MAX_WATERMARK
from two inputs, where the second one was null.

The problem was that {{FinishedOnRestoreInput#FinishedOnRestoreInput}} was being
constructed with wrong number of inputs, because of some accidental {{null}}
passed from the {{StreamGraphGenerator}}.
  • Loading branch information
pnowojski committed Jul 7, 2022
1 parent 2656f95 commit 574ffa4
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -265,7 +266,13 @@ public void setOperatorDescription(String operatorDescription) {

public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {
checkArgument(typeSerializersIn.length > 0);
this.typeSerializersIn = typeSerializersIn;
// Unfortunately code above assumes type serializer can be null, while users of for example
// getTypeSerializersIn would be confused by returning an array size of two with all
// elements set to null...
this.typeSerializersIn =
Arrays.stream(typeSerializersIn)
.filter(typeSerializer -> typeSerializer != null)
.toArray(TypeSerializer<?>[]::new);
}

public TypeSerializer<?>[] getTypeSerializersIn() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.streaming.api.datastream;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Test that ensures watermarks are correctly propagating with finished sources. This test has one
* short living source that finishes immediately. Then after 5th checkpoint job fails over, and then
* it makes sure that the watermark emitted from the other still working source around checkpoint
* 10, is reaching the sink. Only once this happens, the long living source is allowed to exit. If
* the watermark is not propagated/silently swallowed (as for example in FLINK-28357), the test is
* expected to livelock.
*/
public class FinishedSourcesWatermarkITCase extends TestLogger {

private static final AtomicLong CHECKPOINT_10_WATERMARK =
new AtomicLong(Watermark.MAX_WATERMARK.getTimestamp());
private static final AtomicBoolean DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK = new AtomicBoolean();

@Test
public void testTwoConsecutiveFinishedTasksShouldPropagateMaxWatermark() throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

// disable chaining to make sure we will have two consecutive checkpoints with Task ==
// FINISHED
env.disableOperatorChaining();
// Make sure that the short living source has plenty of time to finish before the 5th
// checkpoint
env.enableCheckpointing(200);

// create our sources - one that will want to run forever, and another that finishes
// immediately
DataStream<String> runningStreamIn =
env.addSource(new LongRunningSource(), "Long Running Source");
DataStream<String> emptyStream =
env.addSource(new ShortLivedEmptySource(), "Short Lived Source");

// pass the empty stream through a simple map() function
DataStream<String> mappedEmptyStream = emptyStream.map(v -> v).name("Empty Stream Map");

// join the two streams together to see what watermark is reached during startup and after a
// recovery
runningStreamIn
.connect(mappedEmptyStream)
.process(new NoopCoProcessFunction())
.name("Join")
.addSink(new SinkWaitingForWatermark());

env.execute();
}

private static class SinkWaitingForWatermark implements SinkFunction<String> {
@Override
public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
if (watermark.getTimestamp() > CHECKPOINT_10_WATERMARK.get()) {
DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.set(true);
}
}
}

private static class LongRunningSource extends RichSourceFunction<String>
implements CheckpointListener {
private volatile boolean isRunning = true;
private long lastEmittedWatermark;

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (isRunning && !DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.get()) {
synchronized (sourceContext.getCheckpointLock()) {
lastEmittedWatermark =
Math.max(System.currentTimeMillis(), lastEmittedWatermark);
sourceContext.emitWatermark(new Watermark(lastEmittedWatermark));
}
Thread.sleep(1);
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (checkpointId == 5) {
throw new RuntimeException("Force recovery");
}
if (checkpointId > 10) {
CHECKPOINT_10_WATERMARK.set(
Math.min(lastEmittedWatermark, CHECKPOINT_10_WATERMARK.get()));
}
}
}

private static class ShortLivedEmptySource extends RichSourceFunction<String> {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {}

public void cancel() {}
}

private static class NoopCoProcessFunction extends CoProcessFunction<String, String, String> {
@Override
public void processElement1(String val, Context context, Collector<String> collector) {}

@Override
public void processElement2(String val, Context context, Collector<String> collector) {}
}
}

0 comments on commit 574ffa4

Please sign in to comment.