Skip to content

Commit

Permalink
Revert "[FLINK-19850] Add e2e tests for the new FileSink in streaming…
Browse files Browse the repository at this point in the history
… mode"

This reverts commit dfd2a55.
  • Loading branch information
rmetzger committed Nov 6, 2020
1 parent 79ded59 commit 3e89818
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ under the License.

<modelVersion>4.0.0</modelVersion>

<artifactId>flink-file-sink-test</artifactId>
<name>Flink : E2E Tests : File sink</name>
<artifactId>flink-streaming-file-sink-test</artifactId>
<name>Flink : E2E Tests : Streaming file sink</name>

<dependencies>
<dependency>
Expand All @@ -37,12 +37,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand All @@ -52,16 +46,16 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>FileSinkTestProgram</id>
<id>StreamingFileSinkSinkTestProgram</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>FileSinkProgram</finalName>
<finalName>StreamingFileSinkProgram</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>FileSinkProgram</mainClass>
<mainClass>StreamingFileSinkProgram</mainClass>
</transformer>
</transformers>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
Expand All @@ -42,7 +40,7 @@
import java.util.concurrent.TimeUnit;

/**
* Test program for the {@link StreamingFileSink} and {@link FileSink}.
* Test program for the {@link StreamingFileSink}.
*
* <p>Uses a source that steadily emits a deterministic set of records over 60 seconds,
* after which it idles and waits for job cancellation. Every record has a unique index that is
Expand All @@ -52,51 +50,37 @@
* Adding all committed part files together, and numerically sorting the contents, should
* result in a complete sequence from 0 (inclusive) to 60000 (exclusive).
*/
public enum FileSinkProgram {
public enum StreamingFileSinkProgram {
;

public static void main(final String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String outputPath = params.getRequired("outputPath");
final String sinkToTest = params.getRequired("sinkToTest");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(4);
env.enableCheckpointing(5000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(10L, TimeUnit.SECONDS)));

final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f1);
})
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

// generate data, shuffle, sink
DataStream<Tuple2<Integer, Integer>> source = env.addSource(new Generator(10, 10, 60));

if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f1);
})
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

source.keyBy(0).addSink(sink);
} else if (sinkToTest.equalsIgnoreCase("FileSink")){
FileSink<Tuple2<Integer, Integer>> sink = FileSink
.forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f1);
})
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
source.keyBy(0).sinkTo(sink);
} else {
throw new UnsupportedOperationException("Unsupported sink type: " + sinkToTest);
}
env.addSource(new Generator(10, 10, 60))
.keyBy(0)
.addSink(sink);

env.execute("StreamingFileSinkProgram");
}


/**
* Use first field for buckets.
*/
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ under the License.
<module>flink-confluent-schema-registry</module>
<module>flink-stream-state-ttl-test</module>
<module>flink-sql-client-test</module>
<module>flink-file-sink-test</module>
<module>flink-streaming-file-sink-test</module>
<module>flink-state-evolution-test</module>
<module>flink-rocksdb-state-memory-control-test</module>
<module>flink-end-to-end-tests-common</module>
Expand Down
7 changes: 2 additions & 5 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,8 @@ run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sq
run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions"

run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh local StreamingFileSink" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions"
run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh local FileSink" "skip_check_exceptions"
run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3 FileSink" "skip_check_exceptions"

run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions"
run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"

run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
################################################################################

OUT_TYPE="${1:-local}" # other type: s3
SINK_TO_TEST="${2:-"StreamingFileSink"}"

S3_PREFIX=temp/test_file_sink-$(uuidgen)
S3_PREFIX=temp/test_streaming_file_sink-$(uuidgen)
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
source "$(dirname "$0")"/common.sh
Expand Down Expand Up @@ -61,7 +60,7 @@ if [ "${OUT_TYPE}" == "s3" ]; then
on_exit out_cleanup
fi

TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"

###################################
# Get all lines in part files and sort them numerically.
Expand Down Expand Up @@ -136,16 +135,15 @@ function wait_for_complete_result {
done
}

function run_file_sink_test {
function run_streaming_file_sink_test {
start_cluster

"${FLINK_DIR}/bin/taskmanager.sh" start
"${FLINK_DIR}/bin/taskmanager.sh" start
"${FLINK_DIR}/bin/taskmanager.sh" start

echo "Submitting job."
CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}" \
--sinkToTest "${SINK_TO_TEST}")
CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}")
JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')

if [[ -z $JOB_ID ]]; then
Expand Down Expand Up @@ -189,4 +187,4 @@ function run_file_sink_test {
}

# usual runtime is ~6 minutes
run_test_with_timeout 900 run_file_sink_test
run_test_with_timeout 900 run_streaming_file_sink_test

0 comments on commit 3e89818

Please sign in to comment.