Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Sep 22, 2020
2 parents 48f7eb1 + 347cd26 commit 1451df9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package ideal.sylph.runner.flink.engines;

import com.github.harbby.gadtry.aop.AopGo;
import com.github.harbby.gadtry.ioc.Autowired;
import com.github.harbby.gadtry.jvm.JVMException;
import com.github.harbby.gadtry.jvm.JVMLauncher;
Expand All @@ -26,14 +27,6 @@
import ideal.sylph.spi.job.Flow;
import ideal.sylph.spi.job.JobConfig;
import ideal.sylph.spi.model.ConnectorInfo;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
Expand All @@ -42,13 +35,13 @@

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URLClassLoader;
import java.util.Collection;
import java.util.Collections;

import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static com.github.harbby.gadtry.base.MoreObjects.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.fusesource.jansi.Ansi.Color.GREEN;
import static org.fusesource.jansi.Ansi.Color.YELLOW;
Expand Down Expand Up @@ -100,52 +93,10 @@ private static JobGraph compile(String jobId, StringFlow flow, FlinkJobConfig jo
.setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset()))
.setCallable(() -> {
//---set env
Configuration configuration = new Configuration();
OptimizerPlanEnvironment planEnvironment = new OptimizerPlanEnvironment(configuration, jobClassLoader, jobConfig.getParallelism());
ExecutionEnvironmentFactory factory = () -> planEnvironment;
Method method = ExecutionEnvironment.class.getDeclaredMethod("initializeContextEnvironment", ExecutionEnvironmentFactory.class);
method.setAccessible(true);
method.invoke(null, factory);

//--set streamEnv
Class<?> mainClass = Class.forName(flow.mainClass);
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkEnvFactory.setJobConfig(execEnv, jobConfig, jobId);
StreamExecutionEnvironmentFactory streamFactory = () -> execEnv;
Method m1 = StreamExecutionEnvironment.class.getDeclaredMethod("initializeContextEnvironment", StreamExecutionEnvironmentFactory.class);
m1.setAccessible(true);
m1.invoke(null, streamFactory);
//---
Class<?> mainClass = Class.forName(flow.mainClass);
Method main = mainClass.getMethod("main", String[].class);
try {
/**
* {@link org.apache.flink.client.program.OptimizerPlanEnvironment#executeAsync(String)}
*
* throw new OptimizerPlanEnvironment.ProgramAbortException();
* */
main.invoke(null, (Object) new String[0]);
throwsException(ProgramInvocationException.class);
}
catch (ProgramInvocationException e) {
throw e;
}
catch (Throwable t) {
Field field = OptimizerPlanEnvironment.class.getDeclaredField("pipeline");
field.setAccessible(true);
Pipeline flinkPlan = (Pipeline) field.get(planEnvironment);
if (flinkPlan == null) {
throw new ProgramInvocationException("The program caused an error: ", t);
}
if (flinkPlan instanceof StreamGraph) {
return ((StreamGraph) flinkPlan).getJobGraph();
}
else {
final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
return jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, null);
}
}

throw new ProgramInvocationException("The program plan could not be fetched - the program aborted pre-maturely.");
return getJobGraphForJarClass(execEnv, mainClass, new String[0]);
})
.setClassLoader(jobClassLoader)
.addUserURLClassLoader(jobClassLoader)
Expand All @@ -154,6 +105,27 @@ private static JobGraph compile(String jobId, StringFlow flow, FlinkJobConfig jo
return launcher.startAndGet();
}

private static JobGraph getJobGraphForJarClass(StreamExecutionEnvironment execEnv, Class<?> mainClass, String[] args)
throws Exception
{
final StreamExecutionEnvironment mock = AopGo.proxy(StreamExecutionEnvironment.class)
.byInstance(execEnv)
.aop(binder -> {
binder.doAround(x -> null).when().execute((String) null);
binder.doAround(x -> null).when().execute((StreamGraph) null);
}).build();

StreamExecutionEnvironmentFactory streamFactory = () -> mock;
Method method = StreamExecutionEnvironment.class.getDeclaredMethod("initializeContextEnvironment", StreamExecutionEnvironmentFactory.class);
method.setAccessible(true);
method.invoke(null, streamFactory);

Method main = mainClass.getMethod("main", String[].class);
checkState(Modifier.isStatic(main.getModifiers()));
main.invoke(null, (Object) args);
return mock.getStreamGraph().getJobGraph();
}

public static class StringFlow
extends Flow
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -65,7 +57,6 @@
import java.util.stream.Collectors;

import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static com.github.harbby.gadtry.base.Throwables.throwsThrowable;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.apache.calcite.sql.SqlKind.AS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public SylphTableSink(final RowTypeInfo rowTypeInfo, UnaryOperator<DataStream<Ro
@Override
public DataType getConsumedDataType()
{

return tableSchema.toRowDataType();
}

Expand Down

0 comments on commit 1451df9

Please sign in to comment.