diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java index eb7b23b5..236e7dc1 100644 --- a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.sinks.RetractStreamTableSink; @@ -82,6 +83,11 @@ public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) { @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { CassandraOutputFormat.CassandraFormatBuilder builder = CassandraOutputFormat.buildOutputFormat(); builder.setAddress(this.address) .setDatabase(this.database) @@ -100,7 +106,8 @@ public void emitDataStream(DataStream> dataStream) { CassandraOutputFormat outputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); - dataStream.addSink(richSinkFunction); + DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction); + return dataStreamSink; } @Override diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java index 77a3efea..6d42e8af 100644 --- a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java +++ b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.sinks.RetractStreamTableSink; @@ -71,16 +72,22 @@ public TypeInformation[] getFieldTypes() { @Override public void emitDataStream(DataStream> dataStream) { - ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat(); - builder.setFieldNames(this.fieldNames) - .setFieldTypes(this.fieldTypes); - ConsoleOutputFormat outputFormat = builder.finish(); - RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); - dataStream.addSink(richSinkFunction); + // flink 1.9 use consumeDataStream } @Override public ConsoleSink genStreamSink(TargetTableInfo targetTableInfo) { return this; } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { + ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat(); + builder.setFieldNames(this.fieldNames) + .setFieldTypes(this.fieldTypes); + ConsoleOutputFormat outputFormat = builder.finish(); + RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); + DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction); + return dataStreamSink; + } } diff --git a/core/pom.xml b/core/pom.xml index d108ba2e..60d591e2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -23,22 +23,28 @@ + - junit - junit - 4.12 - test + org.apache.flink + flink-table-common + ${flink.version} - joda-time - joda-time - 2.5 + org.apache.flink + flink-table-api-java-bridge_2.11 + ${flink.version} + + + + org.apache.flink + flink-table-planner-blink_2.11 + ${flink.version} org.apache.flink - flink-core + flink-table-runtime-blink_2.11 ${flink.version} @@ -56,16 +62,23 @@ org.apache.flink - flink-table-planner_2.11 + flink-cep-scala_2.11 ${flink.version} org.apache.flink - flink-table-common + flink-yarn_2.11 ${flink.version} + + org.apache.flink + flink-statebackend-rocksdb_2.11 + ${flink.version} + + + org.apache.calcite calcite-server @@ -92,28 +105,16 @@ - org.apache.flink - flink-cep-scala_2.11 - ${flink.version} - - - - org.apache.flink - flink-scala_2.11 - ${flink.version} - - - - org.apache.flink - flink-yarn_2.11 - ${flink.version} + junit + junit + 4.12 + test - - org.apache.flink - flink-statebackend-rocksdb_2.11 - ${flink.version} + joda-time + joda-time + 2.5 diff --git a/core/src/main/java/com/dtstack/flink/sql/Main.java b/core/src/main/java/com/dtstack/flink/sql/Main.java index c5b3cf7e..2550832a 100644 --- a/core/src/main/java/com/dtstack/flink/sql/Main.java +++ b/core/src/main/java/com/dtstack/flink/sql/Main.java @@ -26,8 +26,8 @@ import com.dtstack.flink.sql.enums.ClusterMode; import com.dtstack.flink.sql.enums.ECacheType; import com.dtstack.flink.sql.enums.EPluginLoadMode; +//import com.dtstack.flink.sql.exec.FlinkSQLExec; import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment; -import com.dtstack.flink.sql.exec.FlinkSQLExec; import com.dtstack.flink.sql.option.OptionParser; import com.dtstack.flink.sql.parser.CreateFuncParser; import com.dtstack.flink.sql.parser.CreateTmpTableParser; @@ -66,8 +66,9 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamContextEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; @@ -187,7 +188,8 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron //sql-dimensional table contains the dimension table of execution sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache); }else{ - FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql()); +// FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql()); + tableEnv.sqlUpdate(result.getExecSql()); if(LOG.isInfoEnabled()){ LOG.info("exec sql: " + result.getExecSql()); } @@ -216,7 +218,7 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set cla } } - private static void registerUDF(SqlTree sqlTree, List jarURList, StreamTableEnvironment tableEnv) + private static void registerUDF(SqlTree sqlTree, List jarURList, TableEnvironment tableEnv) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { //register urf // udf和tableEnv须由同一个类加载器加载 @@ -249,7 +251,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en String adaptSql = sourceTableInfo.getAdaptSelectSql(); Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql); - RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames()); + RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames()); DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo) .map((Tuple2 f0) -> { return f0.f1; }) .returns(typeInfo); @@ -353,12 +355,16 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert * 获取StreamTableEnvironment并设置相关属性 * * @param confProperties - * @param env * @return */ private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) { + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + confProperties = PropertiesUtils.propertiesTrim(confProperties); - StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env); FlinkUtil.setTableEnvTTL(confProperties, tableEnv); return tableEnv; } diff --git a/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java index 8ac1edd4..bbeba899 100644 --- a/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java +++ b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java @@ -93,7 +93,11 @@ public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); + return execute(streamGraph); + } + @Override + public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setClasspaths(classpaths); diff --git a/core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java b/core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java index 6bcc2525..ccd27c1e 100644 --- a/core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java +++ b/core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java @@ -1,84 +1,86 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flink.sql.exec; - -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlInsert; -import org.apache.calcite.sql.SqlNode; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.calcite.FlinkPlannerImpl; -import org.apache.flink.table.plan.logical.LogicalRelNode; -import org.apache.flink.table.plan.schema.TableSinkTable; -import org.apache.flink.table.plan.schema.TableSourceSinkTable; -import scala.Option; - -import java.lang.reflect.Method; - -/** - * @description: mapping by name when insert into sink table - * @author: maqi - * @create: 2019/08/15 11:09 - */ -public class FlinkSQLExec { - - public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception { - - FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory()); - SqlNode insert = planner.parse(stmt); - - if (!(insert instanceof SqlInsert)) { - throw new TableException( - "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT."); - } - SqlNode query = ((SqlInsert) insert).getSource(); - - SqlNode validatedQuery = planner.validate(query); - - Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel)); - String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0); - - Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class); - method.setAccessible(true); - Option sinkTab = (Option)method.invoke(tableEnv, targetTableName); - - if (sinkTab.isEmpty()) { - throw new ValidationException("Sink table " + targetTableName + "not found in flink"); - } - - TableSourceSinkTable targetTable = (TableSourceSinkTable) sinkTab.get(); - TableSinkTable tableSinkTable = (TableSinkTable)targetTable.tableSinkTable().get(); - String[] fieldNames = tableSinkTable.tableSink().getFieldNames(); - - Table newTable = null; - try { - newTable = queryResult.select(String.join(",", fieldNames)); - } catch (Exception e) { - throw new ValidationException( - "Field name of query result and registered TableSink "+targetTableName +" do not match.\n" + - "Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" + - "TableSink schema: " + String.join(",", fieldNames)); - } - - tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig()); - } -} \ No newline at end of file +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package com.dtstack.flink.sql.exec; +// +//import org.apache.calcite.sql.SqlIdentifier; +//import org.apache.calcite.sql.SqlInsert; +//import org.apache.calcite.sql.SqlNode; +//import org.apache.flink.table.api.Table; +//import org.apache.flink.table.api.TableEnvironment; +//import org.apache.flink.table.api.TableException; +//import org.apache.flink.table.api.ValidationException; +//import org.apache.flink.table.api.java.StreamTableEnvironment; +//import org.apache.flink.table.calcite.FlinkPlannerImpl; +//import org.apache.flink.table.plan.logical.LogicalRelNode; +//import org.apache.flink.table.plan.schema.TableSinkTable; +//import org.apache.flink.table.plan.schema.TableSourceSinkTable; +//import org.apache.flink.table.planner.sinks.TableSinkUtils; +//import scala.Option; +// +//import java.lang.reflect.Method; +// +///** +// * @description: mapping by name when insert into sink table +// * @author: maqi +// * @create: 2019/08/15 11:09 +// */ +//public class FlinkSQLExec { +// +// public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception { +// +// FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory()); +// SqlNode insert = planner.parse(stmt); +// +// if (!(insert instanceof SqlInsert)) { +// throw new TableException( +// "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT."); +// } +// SqlNode query = ((SqlInsert) insert).getSource(); +// +//// TableSinkUtils.validateSink(); +// SqlNode validatedQuery = planner.validate(query); +// +// Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel)); +// String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0); +// +// Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class); +// method.setAccessible(true); +// Option sinkTab = (Option)method.invoke(tableEnv, targetTableName); +// +// if (sinkTab.isEmpty()) { +// throw new ValidationException("Sink table " + targetTableName + "not found in flink"); +// } +// +// TableSourceSinkTable targetTable = (TableSourceSinkTable) sinkTab.get(); +// TableSinkTable tableSinkTable = (TableSinkTable)targetTable.tableSinkTable().get(); +// String[] fieldNames = tableSinkTable.tableSink().getFieldNames(); +// +// Table newTable = null; +// try { +// newTable = queryResult.select(String.join(",", fieldNames)); +// } catch (Exception e) { +// throw new ValidationException( +// "Field name of query result and registered TableSink "+targetTableName +" do not match.\n" + +// "Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" + +// "TableSink schema: " + String.join(",", fieldNames)); +// } +// +// tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig()); +// } +//} \ No newline at end of file diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java index f5daed81..9203746c 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java @@ -21,7 +21,6 @@ package com.dtstack.flink.sql.side; import com.dtstack.flink.sql.enums.ECacheType; -import com.dtstack.flink.sql.exec.FlinkSQLExec; import com.dtstack.flink.sql.parser.CreateTmpTableParser; import com.dtstack.flink.sql.side.operator.SideAsyncOperator; import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator; @@ -45,6 +44,7 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -53,7 +53,9 @@ import com.google.common.collect.Maps; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,7 +117,8 @@ public void exec(String sql, Map sideTableMap, StreamTabl if(pollSqlNode.getKind() == INSERT){ System.out.println("----------real exec sql-----------" ); System.out.println(pollSqlNode.toString()); - FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString()); +// FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString()); + tableEnv.sqlUpdate(pollSqlNode.toString()); if(LOG.isInfoEnabled()){ LOG.info("exec sql: " + pollSqlNode.toString()); } @@ -231,13 +234,25 @@ public RowTypeInfo buildOutRowTypeInfo(List sideJoinFieldInfo, HashBa mappingTable.put(tableName, fieldName, mappingFieldName); - sideOutTypes[i] = fieldInfo.getTypeInformation(); + sideOutTypes[i] = convertTimeAttributeType(fieldInfo.getTypeInformation()); sideOutNames[i] = mappingFieldName; } return new RowTypeInfo(sideOutTypes, sideOutNames); } + /** + * 对protime和rowtime做类型转换 + * @param typeInformation + * @return + */ + private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) { + if (typeInformation instanceof TimeIndicatorTypeInfo) { + return SqlTimeTypeInfo.TIMESTAMP; + } + return typeInformation; + } + //需要考虑更多的情况 private void replaceFieldName(SqlNode sqlNode, HashBasedTable mappingTable, String targetTableName, String tableAlias) { SqlKind sqlKind = sqlNode.getKind(); @@ -705,7 +720,7 @@ private void joinFun(Object pollObj, Map localTableCache, leftScopeChild.setTableName(joinInfo.getLeftTableName()); Table leftTable = getTableFromCache(localTableCache, joinInfo.getLeftTableAlias(), joinInfo.getLeftTableName()); - RowTypeInfo leftTypeInfo = new RowTypeInfo(leftTable.getSchema().getTypes(), leftTable.getSchema().getColumnNames()); + RowTypeInfo leftTypeInfo = new RowTypeInfo(leftTable.getSchema().getFieldTypes(), leftTable.getSchema().getFieldNames()); leftScopeChild.setRowTypeInfo(leftTypeInfo); JoinScope.ScopeChild rightScopeChild = new JoinScope.ScopeChild(); @@ -738,7 +753,7 @@ private void joinFun(Object pollObj, Map localTableCache, targetTable = localTableCache.get(joinInfo.getLeftTableName()); } - RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames()); + RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames()); DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class) .map((Tuple2 f0) -> { return f0.f1; }) @@ -772,9 +787,11 @@ private void joinFun(Object pollObj, Map localTableCache, replaceInfoList.add(replaceInfo); - if (!tableEnv.isRegistered(joinInfo.getNewTableName())){ + List registeredTableName = Arrays.asList(tableEnv.listTables()); + if (!registeredTableName.contains(joinInfo.getNewTableName())) { tableEnv.registerDataStream(joinInfo.getNewTableName(), dsOut, String.join(",", sideOutTypeInfo.getFieldNames())); } + } private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) { @@ -783,7 +800,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab String[] fields = fieldsInfo.split(","); for (int i = 0; i < fields.length; i++) { String[] filed = fields[i].split("\\s"); - if (filed.length < 2 || fields.length != table.getSchema().getColumnNames().length){ + if (filed.length < 2 || fields.length != table.getSchema().getFieldNames().length){ return false; } else { String[] filedNameArr = new String[filed.length - 1]; diff --git a/core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java b/core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java index 28ed4d1f..133f0a2b 100644 --- a/core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java +++ b/core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; @@ -278,7 +279,7 @@ public static int getEnvParallelism(Properties properties){ * @param tableEnv * @return */ - public static void setTableEnvTTL(Properties properties, StreamTableEnvironment tableEnv) { + public static void setTableEnvTTL(Properties properties, TableEnvironment tableEnv) { String ttlMintimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MINTIME); String ttlMaxtimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MAXTIME); if (StringUtils.isNotEmpty(ttlMintimeStr) || StringUtils.isNotEmpty(ttlMaxtimeStr)) { @@ -295,8 +296,10 @@ public static void setTableEnvTTL(Properties properties, StreamTableEnvironment ttlMaxtime = getTtlTime(Integer.parseInt(ttlMaxtimeStrMatcher.group(1)), ttlMaxtimeStrMatcher.group(2)); } if (0L != ttlMintime && 0L != ttlMaxtime) { - StreamQueryConfig qConfig = tableEnv.queryConfig(); - qConfig.withIdleStateRetentionTime(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime)); +// StreamQueryConfig qConfig = tableEnv.queryConfig(); +// qConfig.withIdleStateRetentionTime(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime)); + TableConfig qConfig = tableEnv.getConfig(); + qConfig.setIdleStateRetentionTime(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime)); } } } diff --git a/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java b/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java index f372e4d1..258dc60f 100644 --- a/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java +++ b/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java @@ -137,11 +137,18 @@ private RichSinkFunction createEsSinkFunction(){ @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { RichSinkFunction richSinkFunction = createEsSinkFunction(); DataStreamSink streamSink = dataStream.addSink(richSinkFunction); if(parallelism > 0){ streamSink.setParallelism(parallelism); } + + return streamSink; } public void setParallelism(int parallelism) { diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java index 03d46006..b5a97f33 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.sinks.AppendStreamTableSink; @@ -67,15 +68,21 @@ public HbaseSink genStreamSink(TargetTableInfo targetTableInfo) { @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { HbaseOutputFormat.HbaseOutputFormatBuilder builder = HbaseOutputFormat.buildHbaseOutputFormat(); builder.setHost(this.zookeeperQuorum).setZkParent(this.parent).setTable(this.tableName); - + builder.setRowkey(rowkey); builder.setColumnNames(fieldNames); HbaseOutputFormat outputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); - dataStream.addSink(richSinkFunction); + DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction); + return dataStreamSink; } @Override diff --git a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java index e4d5a984..4adfddf1 100644 --- a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java +++ b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java @@ -122,7 +122,6 @@ public Row deserialize(byte[] message) throws IOException { parseTree(root, null); Row row = new Row(fieldNames.length); - for (int i = 0; i < fieldNames.length; i++) { JsonNode node = getIgnoreCase(fieldNames[i]); TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i); diff --git a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index 8347b0a7..71bbb7d6 100644 --- a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -87,7 +87,7 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv types[i] = TypeInformation.of(kafka09SourceTableInfo.getFieldClasses()[i]); } - TypeInformation typeInformation = new RowTypeInfo(types, kafka09SourceTableInfo.getFields()); + RowTypeInfo typeInformation = new RowTypeInfo(types, kafka09SourceTableInfo.getFields()); FlinkKafkaConsumer09 kafkaSrc; if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) { kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName), @@ -96,7 +96,6 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv kafkaSrc = new CustomerKafka09Consumer(topicName, new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields(), kafka09SourceTableInfo.getFieldExtraInfoList()), props); } - //earliest,latest if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){ kafkaSrc.setStartFromEarliest(); diff --git a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java index 89d1543a..9055f6af 100644 --- a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java +++ b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; @@ -59,9 +60,15 @@ protected SinkFunction createKafkaProducer(String topic, Properties propert @Override public void emitDataStream(DataStream dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink consumeDataStream(DataStream dataStream) { SinkFunction kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner); // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. //kafkaProducer.setFlushOnCheckpoint(true); - dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); + DataStreamSink dataStreamSink = dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); + return dataStreamSink; } } \ No newline at end of file diff --git a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java index 5b0ac683..9016d853 100644 --- a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java +++ b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; @@ -111,6 +112,11 @@ public TypeInformation getRecordType() { @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink consumeDataStream(DataStream> dataStream) { KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink( schema, topic, @@ -123,7 +129,8 @@ public void emitDataStream(DataStream> dataStream) { return record.f1; }).returns(getOutputType().getTypeAt(1)).setParallelism(parallelism); - kafkaTableSink.emitDataStream(ds); + DataStreamSink dataStreamSink = (DataStreamSink) kafkaTableSink.consumeDataStream(ds); + return dataStreamSink; } @Override diff --git a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java index ff15db84..3a1d332b 100644 --- a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java +++ b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java @@ -286,4 +286,9 @@ private Object convert(JsonNode node, TypeInformation info) { } } + @Override + public TypeInformation getProducedType() { + return typeInfo; + } + } diff --git a/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java b/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java index 62b7c941..3b483622 100644 --- a/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java +++ b/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java @@ -9,6 +9,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.sinks.RetractStreamTableSink; @@ -52,6 +53,11 @@ public KuduSink genStreamSink(TargetTableInfo targetTableInfo) { @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { KuduOutputFormat.KuduOutputFormatBuilder builder = KuduOutputFormat.buildKuduOutputFormat(); builder.setKuduMasters(this.kuduMasters) .setTableName(this.tableName) @@ -63,7 +69,8 @@ public void emitDataStream(DataStream> dataStream) { .setFieldTypes(this.fieldTypes); KuduOutputFormat kuduOutputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(kuduOutputFormat); - dataStream.addSink(richSinkFunction); + DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction); + return dataStreamSink; } @Override diff --git a/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoSink.java b/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoSink.java index 4e28d8fd..19800f16 100644 --- a/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoSink.java +++ b/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoSink.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.sinks.RetractStreamTableSink; @@ -66,6 +67,11 @@ public MongoSink genStreamSink(TargetTableInfo targetTableInfo) { @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { MongoOutputFormat.MongoOutputFormatBuilder builder = MongoOutputFormat.buildMongoOutputFormat(); builder.setAddress(this.address) .setDatabase(this.database) @@ -77,7 +83,8 @@ public void emitDataStream(DataStream> dataStream) { MongoOutputFormat outputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); - dataStream.addSink(richSinkFunction); + DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction); + return dataStreamSink; } @Override diff --git a/mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java b/mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java index 9ba6736b..8feca538 100644 --- a/mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java +++ b/mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java @@ -24,6 +24,9 @@ import com.dtstack.flink.sql.sink.rdb.RdbSink; import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat; import com.dtstack.flink.sql.util.DtStringUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.types.Row; import java.util.List; import java.util.Map; diff --git a/pom.xml b/pom.xml index b4c3478b..6c4730e5 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ UTF-8 - 1.8.1 + 1.9.1 diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java index 5fc7ba15..4f56499a 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java @@ -26,17 +26,21 @@ import org.apache.commons.collections.CollectionUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.xml.datatype.DatatypeConstants; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Calendar; import java.util.List; import java.util.Map; @@ -73,10 +77,10 @@ public Row fillData(Row input, Object sideInput) { boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass()); //Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long. - if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) { - obj = ((Timestamp) obj).getTime(); + // flink1.9 blink proctime type convert to SqlTimeTypeInfo ,so covert to SqlTimeTypeInfo.TIMESTAMP type. + if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) { + obj = Timestamp.valueOf(((LocalDateTime) obj)); } - row.setField(entry.getKey(), obj); } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java index babff68f..c7effea2 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java @@ -197,15 +197,6 @@ protected void buildSqlTypes(List fieldTypeArray) { } - @Override - public void emitDataStream(DataStream> dataStream) { - RichSinkFunction richSinkFunction = createJdbcSinkFunc(); - DataStreamSink streamSink = dataStream.addSink(richSinkFunction); - streamSink.name(registerTabName); - if (parallelism > 0) { - streamSink.setParallelism(parallelism); - } - } @Override public TableSink> configure(String[] fieldNames, TypeInformation[] fieldTypes) { @@ -274,4 +265,20 @@ public void setDbType(String dbType) { public abstract RetractJDBCOutputFormat getOutputFormat(); + @Override + public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { + RichSinkFunction richSinkFunction = createJdbcSinkFunc(); + DataStreamSink streamSink = dataStream.addSink(richSinkFunction); + streamSink.name(registerTabName); + if (parallelism > 0) { + streamSink.setParallelism(parallelism); + } + return streamSink; + } + } diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java index d2e28c01..b7151960 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisSink.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.sinks.RetractStreamTableSink; @@ -90,6 +91,11 @@ public TypeInformation getRecordType() { @Override public void emitDataStream(DataStream> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public DataStreamSink> consumeDataStream(DataStream> dataStream) { RedisOutputFormat.RedisOutputFormatBuilder builder = RedisOutputFormat.buildRedisOutputFormat(); builder.setUrl(this.url) .setDatabase(this.database) @@ -106,7 +112,8 @@ public void emitDataStream(DataStream> dataStream) { .setMasterName(this.masterName); RedisOutputFormat redisOutputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(redisOutputFormat); - dataStream.addSink(richSinkFunction); + DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction); + return dataStreamSink; } @Override