Skip to content

Commit

Permalink
[FLINK-34657] extract lineage info for stream API (apache#25056)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu authored Jul 20, 2024
1 parent 999b7a2 commit bcacea9
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
Expand Down Expand Up @@ -55,13 +56,18 @@ static <T> DataStreamSink<T> forSinkFunction(
StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);
final StreamExecutionEnvironment executionEnvironment =
inputStream.getExecutionEnvironment();
PhysicalTransformation<T> transformation =
LegacySinkTransformation<T> transformation =
new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Unnamed",
sinkOperator,
executionEnvironment.getParallelism(),
false);
if (sinkFunction instanceof LineageVertexProvider) {
transformation.setLineageVertex(
((LineageVertexProvider) sinkFunction).getLineageVertex());
}

executionEnvironment.addOperator(transformation);
return new DataStreamSink<>(transformation);
}
Expand All @@ -82,6 +88,10 @@ public static <T> DataStreamSink<T> forSink(
executionEnvironment.getParallelism(),
false,
customSinkOperatorUidHashes);
if (sink instanceof LineageVertexProvider) {
transformation.setLineageVertex(((LineageVertexProvider) sink).getLineageVertex());
}

executionEnvironment.addOperator(transformation);
return new DataStreamSink<>(transformation);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.List;

/** Default implementation for {@link LineageVertex}. */
@Internal
public class DefaultLineageVertex implements LineageVertex {
private List<LineageDataset> lineageDatasets;

public DefaultLineageVertex() {
this.lineageDatasets = new ArrayList<>();
}

public void addLineageDataset(LineageDataset lineageDataset) {
this.lineageDatasets.add(lineageDataset);
}

@Override
public List<LineageDataset> datasets() {
return lineageDatasets;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.Boundedness;

import java.util.ArrayList;
import java.util.List;

/** Default implementation for {@link SourceLineageVertex}. */
@Internal
public class DefaultSourceLineageVertex implements SourceLineageVertex {
private Boundedness boundedness;
private List<LineageDataset> lineageDatasets;

public DefaultSourceLineageVertex(Boundedness boundedness) {
this.lineageDatasets = new ArrayList<>();
this.boundedness = boundedness;
}

public void addDataset(LineageDataset lineageDataset) {
this.lineageDatasets.add(lineageDataset);
}

@Override
public List<LineageDataset> datasets() {
return this.lineageDatasets;
}

@Override
public Boundedness boundedness() {
return this.boundedness;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
Expand Down Expand Up @@ -69,6 +71,7 @@ public LegacySourceTransformation(
super(name, outputType, parallelism, parallelismConfigured);
this.operatorFactory = checkNotNull(SimpleOperatorFactory.of(operator));
this.boundedness = checkNotNull(boundedness);
this.extractLineageVertex(operator);
}

/** Mutable for legacy sources in the Table API. */
Expand Down Expand Up @@ -105,4 +108,11 @@ public List<Transformation<?>> getInputs() {
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}

private void extractLineageVertex(StreamSource<T, ?> operator) {
SourceFunction sourceFunction = operator.getUserFunction();
if (sourceFunction instanceof LineageVertexProvider) {
setLineageVertex(((LineageVertexProvider) sourceFunction).getLineageVertex());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.operators.ChainingStrategy;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -64,6 +65,7 @@ public SourceTransformation(
super(name, outputType, parallelism);
this.source = source;
this.watermarkStrategy = watermarkStrategy;
this.extractLineageVertex();
}

public SourceTransformation(
Expand All @@ -76,6 +78,7 @@ public SourceTransformation(
super(name, outputType, parallelism, parallelismConfigured);
this.source = source;
this.watermarkStrategy = watermarkStrategy;
this.extractLineageVertex();
}

public Source<OUT, SplitT, EnumChkT> getSource() {
Expand Down Expand Up @@ -118,4 +121,10 @@ public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
public String getCoordinatorListeningID() {
return coordinatorListeningID;
}

private void extractLineageVertex() {
if (source instanceof LineageVertexProvider) {
setLineageVertex(((LineageVertexProvider) source).getLineageVertex());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.lineage;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;

import static org.assertj.core.api.Assertions.assertThat;

/** Testing for lineage graph util. */
class LineageGraphUtilsTest {
private static final String SOURCE_DATASET_NAME = "LineageSource";
private static final String SOURCE_DATASET_NAMESPACE = "source://LineageSource";
private static final String SINK_DATASET_NAME = "LineageSink";
private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";

private static final String LEGACY_SOURCE_DATASET_NAME = "LineageSourceFunction";
private static final String LEGACY_SOURCE_DATASET_NAMESPACE = "source://LineageSourceFunction";
private static final String LEGACY_SINK_DATASET_NAME = "LineageSinkFunction";
private static final String LEGACY_SINK_DATASET_NAMESPACE = "sink://LineageSinkFunction";

@Test
void testExtractLineageGraphFromLegacyTransformations() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new LineageSourceFunction());
DataStreamSink<Long> sink = source.addSink(new LineageSinkFunction());

LineageGraph lineageGraph =
LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));

assertThat(lineageGraph.sources().size()).isEqualTo(1);
assertThat(lineageGraph.sources().get(0).boundedness())
.isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
.isEqualTo(LEGACY_SOURCE_DATASET_NAME);
assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
.isEqualTo(LEGACY_SOURCE_DATASET_NAMESPACE);

assertThat(lineageGraph.sinks().size()).isEqualTo(1);
assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
.isEqualTo(LEGACY_SINK_DATASET_NAME);
assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
.isEqualTo(LEGACY_SINK_DATASET_NAMESPACE);

assertThat(lineageGraph.relations().size()).isEqualTo(1);
}

@Test
void testExtractLineageGraphFromTransformations() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source =
env.fromSource(new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "");
DataStreamSink<Long> sink = source.sinkTo(new LineageSink());

LineageGraph lineageGraph =
LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));

assertThat(lineageGraph.sources().size()).isEqualTo(1);
assertThat(lineageGraph.sources().get(0).boundedness()).isEqualTo(Boundedness.BOUNDED);
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
.isEqualTo(SOURCE_DATASET_NAME);
assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
.isEqualTo(SOURCE_DATASET_NAMESPACE);

assertThat(lineageGraph.sinks().size()).isEqualTo(1);
assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
.isEqualTo(SINK_DATASET_NAME);
assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
.isEqualTo(SINK_DATASET_NAMESPACE);

assertThat(lineageGraph.relations().size()).isEqualTo(1);
}

private static class LineageSink extends DiscardingSink<Long> implements LineageVertexProvider {
public LineageSink() {
super();
}

@Override
public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new HashMap<>());
DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
lineageVertex.addLineageDataset(lineageDataset);
return lineageVertex;
}
}

private static class LineageSource extends NumberSequenceSource
implements LineageVertexProvider {

public LineageSource(long from, long to) {
super(from, to);
}

@Override
public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new HashMap<>());
DefaultSourceLineageVertex lineageVertex =
new DefaultSourceLineageVertex(Boundedness.BOUNDED);
lineageVertex.addDataset(lineageDataset);
return lineageVertex;
}
}

private static class LineageSourceFunction
implements SourceFunction<Long>, LineageVertexProvider {
private volatile boolean running = true;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
long next = 0;
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(next++);
}
}
}

@Override
public void cancel() {
running = false;
}

@Override
public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
LEGACY_SOURCE_DATASET_NAME,
LEGACY_SOURCE_DATASET_NAMESPACE,
new HashMap<>());
DefaultSourceLineageVertex lineageVertex =
new DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
lineageVertex.addDataset(lineageDataset);
return lineageVertex;
}
}

private static class LineageSinkFunction implements SinkFunction<Long>, LineageVertexProvider {

@Override
public LineageVertex getLineageVertex() {
LineageDataset lineageDataset =
new DefaultLineageDataset(
LEGACY_SINK_DATASET_NAME,
LEGACY_SINK_DATASET_NAMESPACE,
new HashMap<>());
DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
lineageVertex.addLineageDataset(lineageDataset);
return lineageVertex;
}
}
}
Loading

0 comments on commit bcacea9

Please sign in to comment.