Skip to content

Commit

Permalink
[fix-33847][core][kafka] Map and Array type support in the kafka sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
a49a committed Dec 30, 2020
1 parent 089c88b commit f3552de
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,14 @@ public static Set<URL> registerTable(
, pluginLoadMode);
pluginClassPathSets.add(sourceTablePathUrl);
} else if (tableInfo instanceof AbstractTargetTableInfo) {

TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath, pluginLoadMode);
TypeInformation[] flinkTypes = DataTypeUtils.transformTypes(tableInfo.getFieldClasses());
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
// TODO Kafka Sink直接注册,其他的Sink要修复才可以。
if (tableInfo.getType().startsWith("kafka")) {
tableEnv.registerTableSink(tableInfo.getName(), tableSink);
} else {
TypeInformation[] flinkTypes = DataTypeUtils.transformTypes(tableInfo.getFieldClasses());
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
}

URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(
tableInfo.getType()
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;

/**
* Reason: TODO ADD REASON(可选)
Expand All @@ -41,6 +43,10 @@ public static Class<?> stringConvertClass(String str) {
if (lowerStr.startsWith("array")) {
return Array.newInstance(Integer.class, 0).getClass();
}
if (lowerStr.startsWith("map")) {
Map m = new HashMap();
return m.getClass();
}

switch (lowerStr) {
case "boolean":
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand All @@ -46,6 +47,8 @@
import static org.apache.flink.table.api.DataTypes.DECIMAL;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;

import static org.apache.commons.lang3.StringUtils.split;

/**
* @program: flink.sql
* @author: wuren
Expand All @@ -55,6 +58,7 @@ public class DataTypeUtils {

private final static Pattern COMPOSITE_TYPE_PATTERN = Pattern.compile("(.+?)<(.+)>");
private final static String ARRAY = "ARRAY";
private final static String MAP = "MAP";
private final static String ROW = "ROW";
private final static char FIELD_DELIMITER = ',';
private final static char TYPE_DELIMITER = ' ';
Expand Down Expand Up @@ -105,6 +109,30 @@ public static TypeInformation convertToArray(String arrayTypeString) {
return Types.OBJECT_ARRAY(elementType);
}

/**
* 目前Map里只支持基本类型
* @param mapTypeString
* @return
*/
public static TypeInformation convertToMap(String mapTypeString) {
Matcher matcher = matchCompositeType(mapTypeString);
final String errorMsg = mapTypeString + "convert to map type error!";
Preconditions.checkState(matcher.find(), errorMsg);

String normalizedType = normalizeType(matcher.group(1));
Preconditions.checkState(MAP.equals(normalizedType), errorMsg);

String kvTypeString = matcher.group(2);
String[] kvTypeStringList = StringUtils.split(kvTypeString, ",");
final String mapTypeErrorMsg = "There can only be key and value two types in map declaration.";
Preconditions.checkState(kvTypeStringList.length == 2, mapTypeErrorMsg);
String keyTypeString = normalizeType(kvTypeStringList[0]);
String valueTypeString = normalizeType(kvTypeStringList[1]);
TypeInformation keyType = convertToAtomicType(keyTypeString);
TypeInformation valueType = convertToAtomicType(valueTypeString);
return Types.MAP(keyType, valueType);
}

/**
* 目前ROW里只支持基本类型
*
Expand Down Expand Up @@ -158,6 +186,7 @@ public static TypeInformation<Row> getRowTypeInformation(String[] fieldTypes, Cl
return new RowTypeInfo(types, fieldTypes);
}


private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String> fieldInfoStrs) {
ArrayList<TypeInformation> types = Lists.newArrayList();
ArrayList<String> fieldNames = Lists.newArrayList();
Expand Down
26 changes: 26 additions & 0 deletions docs/plugin/kafkaSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,29 @@ into
from
MyTable a
```
## MAP类型示例
目前Kafka Sink支持Map类型
```sql
CREATE TABLE ods(
id INT,
name STRING
) WITH (
...
);

CREATE TABLE dwd (
id INT,
dids MAP<STRING, INT>>
) WITH (
type ='kafka',
bootstrapServers ='localhost:9092',
offsetReset ='latest',
groupId='wuren_foo',
topic ='luna_foo',
parallelism ='1'
);

INSERT INTO dwd
SELECT ods.id, MAP['foo', 1, 'bar', 2] AS dids
FROM ods;
```
2 changes: 1 addition & 1 deletion docs/pluginsInfo.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
### 1 插件列表
#### 1.1 源表插件
* [kafka 源表插件](plugin/kafkaSource.md)
* [kafka 结果表插件](plugin/kafkaSink.md)

#### 1.2 结果表插件
* [kafka 结果表插件](plugin/kafkaSink.md)
* [elasticsearch 结果表插件](plugin/elasticsearchSink.md)
* [hbase 结果表插件](plugin/hbaseSink.md)
* [mysql 结果表插件](plugin/mysqlSink.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dtstack.flink.sql.enums.EUpdateMode;
import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
import com.dtstack.flink.sql.util.DataTypeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -77,11 +79,21 @@ protected Properties getKafkaProperties(KafkaSinkTableInfo KafkaSinkTableInfo) {
}
return props;
}

// TODO Source有相同的方法日后可以合并
protected TypeInformation[] getTypeInformations(KafkaSinkTableInfo kafka11SinkTableInfo) {
String[] fieldTypes = kafka11SinkTableInfo.getFieldTypes();
Class<?>[] fieldClasses = kafka11SinkTableInfo.getFieldClasses();
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
.mapToObj(
i -> {
if (fieldClasses[i].isArray()) {
return DataTypeUtils.convertToArray(fieldTypes[i]);
}
if (fieldClasses[i] == new HashMap().getClass()) {
return DataTypeUtils.convertToMap(fieldTypes[i]);
}
return TypeInformation.of(fieldClasses[i]);
})
.toArray(TypeInformation[]::new);
return types;
}
Expand Down

0 comments on commit f3552de

Please sign in to comment.