Skip to content

Commit

Permalink
upgrade flink1.9
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Nov 21, 2019
1 parent 2411ac6 commit d3882da
Show file tree
Hide file tree
Showing 22 changed files with 273 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
Expand Down Expand Up @@ -82,6 +83,11 @@ public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}

@Override
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
CassandraOutputFormat.CassandraFormatBuilder builder = CassandraOutputFormat.buildOutputFormat();
builder.setAddress(this.address)
.setDatabase(this.database)
Expand All @@ -100,7 +106,8 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

CassandraOutputFormat outputFormat = builder.finish();
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
dataStream.addSink(richSinkFunction);
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
return dataStreamSink;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
Expand Down Expand Up @@ -71,16 +72,22 @@ public TypeInformation<?>[] getFieldTypes() {

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat();
builder.setFieldNames(this.fieldNames)
.setFieldTypes(this.fieldTypes);
ConsoleOutputFormat outputFormat = builder.finish();
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
dataStream.addSink(richSinkFunction);
// flink 1.9 use consumeDataStream
}

@Override
public ConsoleSink genStreamSink(TargetTableInfo targetTableInfo) {
return this;
}

@Override
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat();
builder.setFieldNames(this.fieldNames)
.setFieldTypes(this.fieldTypes);
ConsoleOutputFormat outputFormat = builder.finish();
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
return dataStreamSink;
}
}
59 changes: 30 additions & 29 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@
</properties>

<dependencies>
<!-- blink table module-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

Expand All @@ -56,16 +62,23 @@

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>


<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-server</artifactId>
Expand All @@ -92,28 +105,16 @@
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>

</dependencies>
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/java/com/dtstack/flink/sql/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.dtstack.flink.sql.enums.ClusterMode;
import com.dtstack.flink.sql.enums.ECacheType;
import com.dtstack.flink.sql.enums.EPluginLoadMode;
//import com.dtstack.flink.sql.exec.FlinkSQLExec;
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
import com.dtstack.flink.sql.exec.FlinkSQLExec;
import com.dtstack.flink.sql.option.OptionParser;
import com.dtstack.flink.sql.parser.CreateFuncParser;
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
Expand Down Expand Up @@ -66,8 +66,9 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -187,7 +188,8 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
//sql-dimensional table contains the dimension table of execution
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
}else{
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
// FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
tableEnv.sqlUpdate(result.getExecSql());
if(LOG.isInfoEnabled()){
LOG.info("exec sql: " + result.getExecSql());
}
Expand Down Expand Up @@ -216,7 +218,7 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
}
}

private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
//register urf
// udf和tableEnv须由同一个类加载器加载
Expand Down Expand Up @@ -249,7 +251,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
String adaptSql = sourceTableInfo.getAdaptSelectSql();
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);

RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
.returns(typeInfo);
Expand Down Expand Up @@ -353,12 +355,16 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
* 获取StreamTableEnvironment并设置相关属性
*
* @param confProperties
* @param env
* @return
*/
private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

confProperties = PropertiesUtils.propertiesTrim(confProperties);
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
return tableEnv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
return execute(streamGraph);
}

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setClasspaths(classpaths);

Expand Down
Loading

0 comments on commit d3882da

Please sign in to comment.