Skip to content

Commit

Permalink
[FLINK-14067] Reflectively load JSON plan generator in ExecutionPlanUtil
Browse files Browse the repository at this point in the history
We do this to get rid of the dependency on PlanExecutor for generating
the JSON plan. The executor should not be concerned with printing JSON
plans and this will simplify future executor work.
  • Loading branch information
aljoscha committed Sep 18, 2019
1 parent 394a6e3 commit cf1114b
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,64 @@

package org.apache.flink.api.java;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A utility for extracting an execution plan (as JSON) from a {@link Plan}.
*/
class ExecutionPlanUtil {
@Internal
public class ExecutionPlanUtil {

private static final String PLAN_GENERATOR_CLASS_NAME = "org.apache.flink.optimizer.plandump.ExecutionPlanJSONGenerator";

/**
* Extracts the execution plan (as JSON) from the given {@link Plan}.
*/
static String getExecutionPlanAsJSON(Plan plan) {
// make sure that we do not start an executor in any case here.
// if one runs, fine, of not, we only create the class but disregard immediately afterwards
PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(new Configuration());
return tempExecutor.getOptimizerPlanAsJSON(plan);
public static String getExecutionPlanAsJSON(Plan plan) {
checkNotNull(plan);
ExecutionPlanJSONGenerator jsonGenerator = getJSONGenerator();
return jsonGenerator.getExecutionPlan(plan);
}

private static ExecutionPlanJSONGenerator getJSONGenerator() {
Class<? extends ExecutionPlanJSONGenerator> planGeneratorClass = loadJSONGeneratorClass(
PLAN_GENERATOR_CLASS_NAME);

try {
return planGeneratorClass.getConstructor().newInstance();
} catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the plan generator ("
+ PLAN_GENERATOR_CLASS_NAME + ").", t);
}
}

private static Class<? extends ExecutionPlanJSONGenerator> loadJSONGeneratorClass(String className) {
try {
Class<?> generatorClass = Class.forName(className);
return generatorClass.asSubclass(ExecutionPlanJSONGenerator.class);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not load the plan generator class (" + className
+ "). Do you have the 'flink-optimizer' project in your dependencies?");
} catch (Throwable t) {
throw new RuntimeException(
"An error occurred while loading the plan generator (" + className + ").",
t);
}
}

/**
* Internal interface for the JSON plan generator that has to reside in the optimizer package.
* We load the actual subclass using reflection.
*/
@Internal
public interface ExecutionPlanJSONGenerator {

/**
* Returns the execution plan as a JSON string.
*/
String getExecutionPlan(Plan plan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.optimizer.plandump;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionPlanUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;

/**
* Utility for extracting an execution plan (as JSON) from a given {@link Plan}.
*
* <p>We need this util here in the optimizer because it is the only module that has {@link
* Optimizer}, {@link OptimizedPlan}, and {@link PlanJSONDumpGenerator} available. We use this
* reflectively from the batch execution environments to generate the plan, which we cannot do
* there. It is used from {@link ExecutionPlanUtil}.
*/
@SuppressWarnings("unused")
public class ExecutionPlanJSONGenerator implements ExecutionPlanUtil.ExecutionPlanJSONGenerator {

@Override
public String getExecutionPlan(Plan plan) {
Optimizer opt = new Optimizer(
new DataStatistics(),
new DefaultCostEstimator(),
new Configuration());
OptimizedPlan optPlan = opt.compile(plan);
return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.optimizer.plandump;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionPlanUtil;
import org.apache.flink.api.java.tuple.Tuple2;

import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;

/**
* Tests for {@link ExecutionPlanUtil}. We have to test this here in flink-optimizer because the
* util only works when {@link ExecutionPlanJSONGenerator} is available, which is in
* flink-optimizer.
*/
public class ExecutionPlanUtilTest {

@Test
public void executionPlanCanBeRetrieved() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);

env
.readCsvFile("file:///will/never/be/executed")
.types(String.class, Double.class)
.name("sourceThatWillNotRun")
.map((in) -> in)
.returns(new TypeHint<Tuple2<String, Double>>() {})
.name("theMap")
.writeAsText("file:///will/not/be/executed")
.name("sinkThatWillNotRun");

Plan plan = env.createProgramPlan();
String executionPlanAsJSON = ExecutionPlanUtil.getExecutionPlanAsJSON(plan);

assertThat(executionPlanAsJSON, containsString("sourceThatWillNotRun"));
assertThat(executionPlanAsJSON, containsString("sinkThatWillNotRun"));
assertThat(executionPlanAsJSON, containsString("theMap"));
}
}

0 comments on commit cf1114b

Please sign in to comment.