Skip to content

Commit

Permalink
[FLINK-8852] [sql-client] Add support for FLIP-6 in SQL Client
Browse files Browse the repository at this point in the history
This closes apache#5704.
  • Loading branch information
twalthr committed Mar 26, 2018
1 parent a6a7623 commit d8a376a
Show file tree
Hide file tree
Showing 20 changed files with 561 additions and 289 deletions.
12 changes: 12 additions & 0 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ under the License.
</execution>
</executions>
</plugin>
<!-- Make test classes available to other modules. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public static CommandLine parse(Options options, String[] args, boolean stopAtNo
* @param optionsB options to merge, can be null if none
* @return
*/
static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) {
public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) {
final Options resultOptions = new Options();
if (optionsA != null) {
for (Option option : optionsA.getOptions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public JobSubmissionResult run(
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}

Expand Down Expand Up @@ -882,17 +882,17 @@ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars pr
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}

public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
public static JobGraph getJobGraph(Configuration flinkConfig, PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
return getJobGraph(flinkConfig, optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
}

public JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
JobGraph job;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
job.setSavepointRestoreSettings(savepointSettings);
} else {
JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig);
JobGraphGenerator gen = new JobGraphGenerator(flinkConfig);
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ execution:
# programs are submitted to.

deployment:
# only the 'standalone' deployment is supported
type: standalone
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
Expand Down
9 changes: 9 additions & 0 deletions flink-libraries/flink-sql-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<!-- use a dedicated Scala version to not depend on it -->
<artifactId>flink-clients_2.11</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public SqlClientException(String message) {
public SqlClientException(String message, Throwable e) {
super(message, e);
}

public SqlClientException(Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@

package org.apache.flink.table.client.config;

import org.apache.flink.client.cli.CliFrontendParser;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -58,6 +66,41 @@ public int getGatewayPort() {
return Integer.parseInt(properties.getOrDefault(PropertyStrings.DEPLOYMENT_GATEWAY_PORT, Integer.toString(0)));
}

/**
* Parses the given command line options from the deployment properties. Ignores properties
* that are not defined by options.
*/
public CommandLine getCommandLine(Options commandLineOptions) throws Exception {
final List<String> args = new ArrayList<>();

properties.forEach((k, v) -> {
// only add supported options
if (commandLineOptions.hasOption(k)) {
final Option o = commandLineOptions.getOption(k);
final String argument = "--" + o.getLongOpt();
// options without args
if (!o.hasArg()) {
final Boolean flag = Boolean.parseBoolean(v);
// add key only
if (flag) {
args.add(argument);
}
}
// add key and value
else if (!o.hasArgs()) {
args.add(argument);
args.add(v);
}
// options with multiple args are not supported yet
else {
throw new IllegalArgumentException("Option '" + o + "' is not supported yet.");
}
}
});

return CliFrontendParser.parse(commandLineOptions, args.toArray(new String[args.size()]), true);
}

public Map<String, String> toProperties() {
final Map<String, String> copy = new HashMap<>();
properties.forEach((k, v) -> copy.put(PropertyStrings.DEPLOYMENT + "." + k, v));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@

/**
* Collects results and returns them as a changelog.
*
* @param <C> cluster id to which this result belongs to
*/
public class ChangelogCollectStreamResult extends CollectStreamResult implements ChangelogResult {
public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> implements ChangelogResult<C> {

private List<Tuple2<Boolean, Row>> changeRecordBuffer;
private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

/**
* A result that is represented as a changelog consisting of insert and delete records.
*
* @param <C> cluster id to which this result belongs to
*/
public interface ChangelogResult extends DynamicResult {
public interface ChangelogResult<C> extends DynamicResult<C> {

/**
* Retrieves the available result records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@

/**
* A result that works similarly to {@link DataStreamUtils#collect(DataStream)}.
*
* @param <C> cluster id to which this result belongs to
*/
public abstract class CollectStreamResult implements DynamicResult {
public abstract class CollectStreamResult<C> implements DynamicResult<C> {

private final TypeInformation<Row> outputType;
private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
private final CollectStreamTableSink collectTableSink;
private final ResultRetrievalThread retrievalThread;
private final JobMonitoringThread monitoringThread;
private Runnable program;
private C clusterId;

protected final Object resultLock;
protected SqlExecutionException executionException;
Expand Down Expand Up @@ -73,6 +76,14 @@ public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig conf
monitoringThread = new JobMonitoringThread();
}

@Override
public void setClusterId(C clusterId) {
if (this.clusterId != null) {
throw new IllegalStateException("Cluster id is already present.");
}
this.clusterId = clusterId;
}

@Override
public TypeInformation<Row> getOutputType() {
return outputType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@
* A result of a dynamic table program.
*
* <p>Note: Make sure to call close() after the result is not needed anymore.
*
* @param <C> cluster id to which this result belongs to
*/
public interface DynamicResult {
public interface DynamicResult<C> {

/**
* Sets the cluster id of the cluster this result comes from. This method should only be called once.
*/
void setClusterId(C clusterId);

/**
* Returns whether this result is materialized such that snapshots can be taken or results
Expand Down
Loading

0 comments on commit d8a376a

Please sign in to comment.