Skip to content

Commit

Permalink
[FLINK-16560] Forward Configuration in PackagedProgramUtils#getPipeli…
Browse files Browse the repository at this point in the history
…neFromProgram

Before, when using PackagedProgramUtils (for example in the standalone
cluster entrypoint or the web ui) the Flink Configuration would not be
applied to the execution environment.

This also adds a test that verifies that we forward configuration.
  • Loading branch information
aljoscha committed Apr 2, 2020
1 parent c8a23c7 commit a24734e
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private Configuration getEffectiveConfiguration(
*
* @param args Command line arguments for the info action.
*/
protected void info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException {
protected void info(String[] args) throws Exception {
LOG.info("Running 'info' command.");

final Options commandOptions = CliFrontendParser.getInfoCommandOptions();
Expand Down Expand Up @@ -265,7 +265,10 @@ protected void info(String[] args) throws CliArgsException, FileNotFoundExceptio

LOG.info("Creating program plan dump");

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, parallelism, true);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, program.getJobJarAndDependencies());

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);

if (jsonPlan != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;

/**
Expand All @@ -34,7 +35,8 @@ public Pipeline getPipeline() {
return pipeline;
}

public OptimizerPlanEnvironment(int parallelism) {
public OptimizerPlanEnvironment(Configuration configuration, int parallelism) {
super(configuration);
if (parallelism > 0) {
setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static JobGraph createJobGraph(
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput) throws ProgramInvocationException {
final Pipeline pipeline = getPipelineFromProgram(packagedProgram, defaultParallelism, suppressOutput);
final Pipeline pipeline = getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput);
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism);

if (jobID != null) {
Expand Down Expand Up @@ -93,6 +93,7 @@ public static JobGraph createJobGraph(

public static Pipeline getPipelineFromProgram(
PackagedProgram program,
Configuration configuration,
int parallelism,
boolean suppressOutput) throws CompilerException, ProgramInvocationException {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Expand All @@ -116,9 +117,9 @@ public static Pipeline getPipelineFromProgram(
}

// temporary hack to support the optimizer plan preview
OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(parallelism);
OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(configuration, parallelism);
benv.setAsContext();
StreamPlanEnvironment senv = new StreamPlanEnvironment(parallelism);
StreamPlanEnvironment senv = new StreamPlanEnvironment(configuration, parallelism);
senv.setAsContext();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
Expand All @@ -37,7 +38,8 @@ public Pipeline getPipeline() {
return pipeline;
}

public StreamPlanEnvironment(int parallelism) {
public StreamPlanEnvironment(Configuration configuration, int parallelism) {
super(configuration);
if (parallelism > 0) {
setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public Class<?> loadClass(String name) throws ClassNotFoundException {
Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);

// we expect this to fail with a "ClassNotFoundException"
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, 666, true);
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, c, 666, true);
FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
fail("Should have failed with a ClassNotFoundException");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testGetExecutionPlan() throws ProgramInvocationException {
.build();

Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1, true);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, new Configuration(), 1, true);
OptimizedPlan op = optimizer.compile(plan);
assertNotNull(op);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetExecutionPlan() {
config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort());

Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, -1, true);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, config, -1, true);
OptimizedPlan op = optimizer.compile(plan);
assertNotNull(op);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.client.program;

import org.apache.flink.configuration.Configuration;

import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -47,7 +49,7 @@ private void runOutputTest(boolean suppressOutput, String[] expectedCapturedOutp

try {
// Flink will throw an error because no job graph will be generated by the main method.
PackagedProgramUtils.getPipelineFromProgram(packagedProgram, 1, suppressOutput);
PackagedProgramUtils.getPipelineFromProgram(packagedProgram, new Configuration(), 1, suppressOutput);
Assert.fail("This should have failed to create the Flink Plan.");
} catch (ProgramInvocationException e) {
// Test that that Flink captured the expected stdout/stderr
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.junit.Test;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests {@link PackagedProgramUtils}.
*/
public class PackagedProgramUtilsTest {

/**
* This tests whether configuration forwarding from a {@link Configuration} to the environment
* works.
*/
@Test
public void testDataSetConfigurationForwarding() throws Exception {
assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());

PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setEntryPointClassName(DataSetTestProgram.class.getName())
.build();

Configuration config = createConfigurationWithOption();

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
packagedProgram,
config,
1 /* parallelism */,
false /* suppress output */);

ExecutionConfig executionConfig = ((Plan) pipeline).getExecutionConfig();

assertExpectedOption(executionConfig);
}

/**
* This tests whether configuration forwarding from a {@link Configuration} to the environment
* works.
*/
@Test
public void testDataStreamConfigurationForwarding() throws Exception {
assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());

PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setEntryPointClassName(DataStreamTestProgram.class.getName())
.build();

Configuration config = createConfigurationWithOption();

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
packagedProgram,
config,
1 /* parallelism */,
false /* suppress output */);

ExecutionConfig executionConfig = ((StreamGraph) pipeline).getExecutionConfig();

assertExpectedOption(executionConfig);
}

private static void assertPrecondition(ExecutionConfig executionConfig) {
// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(false));
}

private static void assertExpectedOption(ExecutionConfig executionConfig) {
// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
}

private static Configuration createConfigurationWithOption() {
Configuration config = new Configuration();
config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);
return config;
}

/** Test Program for the DataSet API. */
public static class DataSetTestProgram {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello").print();
env.execute();
}
}

/** Test Program for the DataStream API. */
public static class DataStreamTestProgram {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello").print();
env.execute();
}
}
}

0 comments on commit a24734e

Please sign in to comment.