Skip to content

Commit

Permalink
[FLINK-13439] Run Streaming SQL e2e test with blink planner
Browse files Browse the repository at this point in the history
This closes apache#9276
  • Loading branch information
docete authored and dawidwys committed Aug 6, 2019
1 parent 6644321 commit f30356b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
Expand Down Expand Up @@ -77,8 +78,20 @@ public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
String planner = params.get("planner", "old");

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
builder.inStreamingMode();

if (planner.equals("old")) {
builder.useOldPlanner();
} else if (planner.equals("blink")) {
builder.useBlinkPlanner();
}

final EnvironmentSettings settings = builder.build();

final StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Time.of(10, TimeUnit.SECONDS)
Expand All @@ -87,7 +100,7 @@ public static void main(String[] args) throws Exception {
sEnv.enableCheckpointing(4000);
sEnv.getConfig().setAutoWatermarkInterval(1000);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, settings);

tEnv.registerTableSource("table1", new GeneratorTableSource(10, 100, 60, 0));
tEnv.registerTableSource("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
Expand Down Expand Up @@ -340,5 +353,4 @@ public void restoreState(List<Integer> state) {
}
}
}

}
3 changes: 2 additions & 1 deletion flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scrip
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"

run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions"
run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions"
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions"
Expand Down
1 change: 0 additions & 1 deletion flink-end-to-end-tests/run-pre-commit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,5 @@ run_test "Modern Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_strea
run_test "Kinesis end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kinesis.sh"
run_test "class loading end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
run_test "Distributed cache end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh"

printf "\n[PASS] All tests passed\n"
exit 0
4 changes: 3 additions & 1 deletion flink-end-to-end-tests/test-scripts/test_streaming_sql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

source "$(dirname "$0")"/common.sh

PLANNER="${1:-old}"

TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-stream-sql-test/target/StreamSQLTestProgram.jar

start_cluster
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start

$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR -outputPath file://${TEST_DATA_DIR}/out/result
$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR -outputPath file://${TEST_DATA_DIR}/out/result -planner ${PLANNER}

function sql_cleanup() {
stop_cluster
Expand Down
3 changes: 2 additions & 1 deletion tools/travis/splits/split_misc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scrip
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"

run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions"
run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions"
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions"
Expand Down
3 changes: 2 additions & 1 deletion tools/travis/splits/split_misc_hadoopfree.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scrip
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"

run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions"
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions"
run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
Expand Down

0 comments on commit f30356b

Please sign in to comment.