Skip to content

Commit

Permalink
[FLINK-9578] [sql-client] Allow to define an auto watermark interval …
Browse files Browse the repository at this point in the history
…in SQL Client

This closes apache#6160.
  • Loading branch information
twalthr committed Jul 3, 2018
1 parent 8c15d37 commit 19040a6
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 5 deletions.
3 changes: 2 additions & 1 deletion docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,13 @@ tables:

execution:
type: streaming # required: execution mode either 'batch' or 'streaming'
result-mode: table # required: either 'table' or 'changelog'
time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
parallelism: 1 # optional: Flink's parallelism (1 by default)
periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
result-mode: table # required: either 'table' or 'changelog'

# Deployment properties allow for describing the cluster to which table programs are submitted to.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ execution:
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: changelog
# parallelism of the program
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public boolean isStreamingExecution() {
PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING);
}

public boolean isBatchExecution() {
return Objects.equals(
properties.get(PropertyStrings.EXECUTION_TYPE),
PropertyStrings.EXECUTION_TYPE_VALUE_BATCH);
}

public TimeCharacteristic getTimeCharacteristic() {
final String s = properties.getOrDefault(
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
Expand All @@ -64,6 +70,10 @@ public TimeCharacteristic getTimeCharacteristic() {
}
}

public long getPeriodicWatermarksInterval() {
return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_PERIODIC_WATERMARKS_INTERVAL, Long.toString(200L)));
}

public long getMinStateRetention() {
return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private PropertyStrings() {

public static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME = "processing-time";

public static final String EXECUTION_PERIODIC_WATERMARKS_INTERVAL = "periodic-watermarks-interval";

public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention";

public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.BatchQueryConfig;
Expand Down Expand Up @@ -193,10 +194,12 @@ private EnvironmentInstance() {
streamExecEnv = createStreamExecutionEnvironment();
execEnv = null;
tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv);
} else {
} else if (mergedEnv.getExecution().isBatchExecution()) {
streamExecEnv = null;
execEnv = createExecutionEnvironment();
tableEnv = TableEnvironment.getTableEnvironment(execEnv);
} else {
throw new SqlExecutionException("Unsupported execution type specified.");
}

// create query config
Expand Down Expand Up @@ -265,6 +268,9 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
env.setParallelism(mergedEnv.getExecution().getParallelism());
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
env.getConfig().setAutoWatermarkInterval(mergedEnv.getExecution().getPeriodicWatermarksInterval());
}
return env;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void testMerging() throws Exception {
tables.add("TableNumber2");
tables.add("NewTable");

assertEquals(merged.getTables().keySet(), tables);
assertEquals(tables, merged.getTables().keySet());
assertTrue(merged.getExecution().isStreamingExecution());
assertEquals(merged.getExecution().getMaxParallelism(), 16);
assertEquals(16, merged.getExecution().getMaxParallelism());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client.gateway.local;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;

import org.apache.commons.cli.Options;
import org.junit.Test;

import java.util.Collections;

import static org.junit.Assert.assertEquals;

/**
* Test for {@link ExecutionContext}.
*/
public class ExecutionContextTest {

private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";

@Test
public void testExecutionConfig() throws Exception {
final ExecutionContext<?> context = createExecutionContext();
final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig();
assertEquals(99, config.getAutoWatermarkInterval());
}

private <T> ExecutionContext<T> createExecutionContext() throws Exception {
final Environment env = EnvironmentFileUtil.parseModified(
DEFAULTS_ENVIRONMENT_FILE,
Collections.singletonMap("$VAR_2", "streaming"));
final SessionContext session = new SessionContext("test-session", new Environment());
final Configuration flinkConfig = new Configuration();
return new ExecutionContext<>(
env,
session,
Collections.emptyList(),
flinkConfig,
new Options(),
Collections.singletonList(new DefaultCLI(flinkConfig)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void testGetSessionProperties() throws Exception {
final Map<String, String> expectedProperties = new HashMap<>();
expectedProperties.put("execution.type", "streaming");
expectedProperties.put("execution.time-characteristic", "event-time");
expectedProperties.put("execution.periodic-watermarks-interval", "99");
expectedProperties.put("execution.parallelism", "1");
expectedProperties.put("execution.max-parallelism", "16");
expectedProperties.put("execution.max-idle-state-retention", "0");
Expand Down Expand Up @@ -266,7 +267,7 @@ public void testBatchQueryExecution() throws Exception {

private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
return new LocalExecutor(
EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE),
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tables:
execution:
type: "$VAR_2"
time-characteristic: event-time
periodic-watermarks-interval: 99
parallelism: 1
max-parallelism: 16
min-idle-state-retention: 0
Expand Down

0 comments on commit 19040a6

Please sign in to comment.