Skip to content

Commit a9bc5ca

Browse files
authored
[Feature][API] Support hocon style declare row type in generic type (apache#6187)
1 parent a09d3bb commit a9bc5ca

File tree

8 files changed

+257
-46
lines changed

8 files changed

+257
-46
lines changed

docs/en/concept/schema-feature.md

+51-19
Large diffs are not rendered by default.

release-note.md

+1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
- [Core] [Shade] Add guava shade module (#4358)
166166
- [Core] [Spark] Support SeaTunnel Time Type (#5188)
167167
- [Core] [Flink] Support Decimal Type with configurable precision and scale (#5419)
168+
- [Core] [API] Support hocon style declare row type in generic type (#6187)
168169

169170
### Connector-V2
170171

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java

+56-23
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.seatunnel.api.table.catalog;
1919

20-
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
22+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
23+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
2124

2225
import org.apache.seatunnel.api.table.type.ArrayType;
2326
import org.apache.seatunnel.api.table.type.BasicType;
@@ -29,9 +32,6 @@
2932
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3033
import org.apache.seatunnel.api.table.type.SqlType;
3134
import org.apache.seatunnel.common.exception.CommonError;
32-
import org.apache.seatunnel.common.utils.JsonUtils;
33-
34-
import java.util.Map;
3535

3636
public class SeaTunnelDataTypeConvertorUtil {
3737

@@ -87,13 +87,13 @@ public static SeaTunnelDataType<?> deserializeSeaTunnelDataType(
8787
private static SeaTunnelDataType<?> parseComplexDataType(String field, String columnStr) {
8888
String column = columnStr.toUpperCase().replace(" ", "");
8989
if (column.startsWith(SqlType.MAP.name())) {
90-
return parseMapType(field, column);
90+
return parseMapType(field, columnStr);
9191
}
9292
if (column.startsWith(SqlType.ARRAY.name())) {
93-
return parseArrayType(field, column);
93+
return parseArrayType(field, columnStr);
9494
}
9595
if (column.startsWith(SqlType.DECIMAL.name())) {
96-
return parseDecimalType(column);
96+
return parseDecimalType(columnStr);
9797
}
9898
if (column.trim().startsWith("{")) {
9999
return parseRowType(columnStr);
@@ -102,31 +102,64 @@ private static SeaTunnelDataType<?> parseComplexDataType(String field, String co
102102
}
103103

104104
private static SeaTunnelDataType<?> parseRowType(String columnStr) {
105-
ObjectNode jsonNodes = JsonUtils.parseObject(columnStr);
106-
Map<String, String> fieldsMap = JsonUtils.toStringMap(jsonNodes);
107-
String[] fieldsName = new String[fieldsMap.size()];
108-
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[fieldsMap.size()];
109-
int i = 0;
110-
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
111-
fieldsName[i] = entry.getKey();
112-
seaTunnelDataTypes[i] = deserializeSeaTunnelDataType(entry.getKey(), entry.getValue());
113-
i++;
105+
String confPayload = "{conf = " + columnStr + "}";
106+
Config conf;
107+
try {
108+
conf = ConfigFactory.parseString(confPayload);
109+
} catch (RuntimeException e) {
110+
throw new IllegalArgumentException(
111+
String.format("HOCON Config parse from %s failed.", confPayload), e);
112+
}
113+
return parseRowType(conf.getObject("conf"));
114+
}
115+
116+
private static SeaTunnelDataType<?> parseRowType(ConfigObject conf) {
117+
String[] fieldNames = new String[conf.size()];
118+
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[conf.size()];
119+
conf.keySet().toArray(fieldNames);
120+
121+
for (int idx = 0; idx < fieldNames.length; idx++) {
122+
String fieldName = fieldNames[idx];
123+
ConfigValue typeVal = conf.get(fieldName);
124+
switch (typeVal.valueType()) {
125+
case STRING:
126+
{
127+
fieldTypes[idx] =
128+
deserializeSeaTunnelDataType(
129+
fieldNames[idx], (String) typeVal.unwrapped());
130+
}
131+
break;
132+
case OBJECT:
133+
{
134+
fieldTypes[idx] = parseRowType((ConfigObject) typeVal);
135+
}
136+
break;
137+
case LIST:
138+
case NUMBER:
139+
case BOOLEAN:
140+
case NULL:
141+
default:
142+
throw new IllegalArgumentException(
143+
String.format(
144+
"Unsupported parse SeaTunnel Type from '%s'.",
145+
typeVal.unwrapped()));
146+
}
114147
}
115-
return new SeaTunnelRowType(fieldsName, seaTunnelDataTypes);
148+
return new SeaTunnelRowType(fieldNames, fieldTypes);
116149
}
117150

118151
private static SeaTunnelDataType<?> parseMapType(String field, String columnStr) {
119-
String genericType = getGenericType(columnStr);
152+
String genericType = getGenericType(columnStr).trim();
120153
int index =
121-
genericType.startsWith(SqlType.DECIMAL.name())
154+
genericType.toUpperCase().startsWith(SqlType.DECIMAL.name())
122155
?
123156
// if map key is decimal, we should find the index of second ','
124157
genericType.indexOf(",", genericType.indexOf(",") + 1)
125158
:
126159
// if map key is not decimal, we should find the index of first ','
127160
genericType.indexOf(",");
128-
String keyGenericType = genericType.substring(0, index);
129-
String valueGenericType = genericType.substring(index + 1);
161+
String keyGenericType = genericType.substring(0, index).trim();
162+
String valueGenericType = genericType.substring(index + 1).trim();
130163
return new MapType<>(
131164
deserializeSeaTunnelDataType(field, keyGenericType),
132165
deserializeSeaTunnelDataType(field, valueGenericType));
@@ -138,7 +171,7 @@ private static String getGenericType(String columnStr) {
138171
}
139172

140173
private static SeaTunnelDataType<?> parseArrayType(String field, String columnStr) {
141-
String genericType = getGenericType(columnStr);
174+
String genericType = getGenericType(columnStr).trim();
142175
SeaTunnelDataType<?> dataType = deserializeSeaTunnelDataType(field, genericType);
143176
switch (dataType.getSqlType()) {
144177
case STRING:
@@ -158,7 +191,7 @@ private static SeaTunnelDataType<?> parseArrayType(String field, String columnSt
158191
case DOUBLE:
159192
return ArrayType.DOUBLE_ARRAY_TYPE;
160193
default:
161-
throw CommonError.unsupportedDataType("SeaTunnel", columnStr, field);
194+
throw CommonError.unsupportedDataType("SeaTunnel", genericType, field);
162195
}
163196
}
164197

seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.api.table.type.DecimalType;
2828
import org.apache.seatunnel.api.table.type.MapType;
2929
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
30+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3031
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3132
import org.apache.seatunnel.api.table.type.SqlType;
3233
import org.apache.seatunnel.common.utils.SeaTunnelException;
@@ -137,6 +138,42 @@ public void testCatalogUtilGetCatalogTable() throws FileNotFoundException, URISy
137138
Thread.currentThread().getContextClassLoader()));
138139
}
139140

141+
@Test
142+
public void testGenericRowSchemaTest() throws FileNotFoundException, URISyntaxException {
143+
String path = getTestConfigFile("/conf/generic_row.schema.conf");
144+
Config config = ConfigFactory.parseFile(new File(path));
145+
SeaTunnelRowType seaTunnelRowType =
146+
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
147+
Assertions.assertNotNull(seaTunnelRowType);
148+
Assertions.assertArrayEquals(
149+
new String[] {"map0", "map1"}, seaTunnelRowType.getFieldNames());
150+
151+
MapType<String, SeaTunnelRowType> mapType0 =
152+
(MapType<String, SeaTunnelRowType>) seaTunnelRowType.getFieldType(0);
153+
MapType<String, SeaTunnelRowType> mapType1 =
154+
(MapType<String, SeaTunnelRowType>) seaTunnelRowType.getFieldType(1);
155+
Assertions.assertNotNull(mapType0);
156+
Assertions.assertNotNull(mapType1);
157+
Assertions.assertEquals(BasicType.STRING_TYPE, mapType0.getKeyType());
158+
159+
SeaTunnelRowType expectedVal =
160+
new SeaTunnelRowType(
161+
new String[] {"c_int", "c_string", "c_row"},
162+
new SeaTunnelDataType[] {
163+
BasicType.INT_TYPE,
164+
BasicType.STRING_TYPE,
165+
new SeaTunnelRowType(
166+
new String[] {"c_int"},
167+
new SeaTunnelDataType[] {BasicType.INT_TYPE})
168+
});
169+
SeaTunnelRowType mapType0ValType =
170+
(SeaTunnelRowType) ((SeaTunnelDataType<?>) mapType0.getValueType());
171+
Assertions.assertEquals(expectedVal, mapType0ValType);
172+
SeaTunnelRowType mapType1ValType =
173+
(SeaTunnelRowType) ((SeaTunnelDataType<?>) mapType1.getValueType());
174+
Assertions.assertEquals(expectedVal, mapType1ValType);
175+
}
176+
140177
public static String getTestConfigFile(String configFile)
141178
throws FileNotFoundException, URISyntaxException {
142179
URL resource = CatalogTableUtilTest.class.getResource(configFile);

seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,27 @@ void testParseWithUnsupportedType() {
6666
"ErrorCode:[COMMON-07], ErrorDescription:['SeaTunnel' unsupported data type 'uuid' of 'test']",
6767
exception4.getMessage());
6868

69-
RuntimeException exception5 =
69+
IllegalArgumentException exception5 =
7070
Assertions.assertThrows(
71-
RuntimeException.class,
71+
IllegalArgumentException.class,
7272
() ->
7373
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
7474
"test", "{uuid}"));
75-
Assertions.assertEquals(
76-
"String json deserialization exception.{uuid}", exception5.getMessage());
75+
String expectedMsg5 =
76+
String.format("HOCON Config parse from %s failed.", "{conf = {uuid}}");
77+
Assertions.assertEquals(expectedMsg5, exception5.getMessage());
78+
79+
String invalidTypeDeclaration = "[e]";
80+
IllegalArgumentException exception6 =
81+
Assertions.assertThrows(
82+
IllegalArgumentException.class,
83+
() ->
84+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
85+
"test",
86+
String.format("{c_0 = %s}", invalidTypeDeclaration)));
87+
String expectedMsg6 =
88+
String.format(
89+
"Unsupported parse SeaTunnel Type from '%s'.", invalidTypeDeclaration);
90+
Assertions.assertEquals(expectedMsg6, exception6.getMessage());
7791
}
7892
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
schema {
19+
fields {
20+
# Hocon style declare row type in generic type
21+
map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int = int}}>"
22+
# Json style declare row type in generic type
23+
map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\", \"c_row\":{\"c_int\":\"int\"}}>"
24+
}
25+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java

+3
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,8 @@ public void testFakeConnector(TestContainer container)
3838
Container.ExecResult fakeWithTemplate =
3939
container.executeJob("/fake_to_assert_with_template.conf");
4040
Assertions.assertEquals(0, fakeWithTemplate.getExitCode());
41+
Container.ExecResult fakeComplex =
42+
container.executeJob("/fake_generic_row_type_to_assert.conf");
43+
Assertions.assertEquals(0, fakeWithTemplate.getExitCode());
4144
}
4245
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = BATCH
21+
# checkpoint.interval = 10000
22+
}
23+
24+
source {
25+
FakeSource {
26+
row.num = 1
27+
schema = {
28+
fields {
29+
c_0 = "map<string, {c_int=int\nc_string=string}>"
30+
c_1 = "map<string, {c_int=int,c_string=string}>"
31+
c_2 = "map<string, {c_int=int,c_string=string,c_row={c_int=int}}>"
32+
c_3 = "map<string, {\"c_int\":\"int\",\"c_string\":\"string\"}>"
33+
}
34+
}
35+
result_table_name = "fake"
36+
}
37+
}
38+
39+
sink{
40+
Assert {
41+
source_table_name = "fake"
42+
rules =
43+
{
44+
catalog_table_rule {
45+
column_rule = [
46+
{
47+
name = "c_0"
48+
type = "map<string, {c_int=int\nc_string=string}>"
49+
}
50+
{
51+
name = "c_1"
52+
type = "map<string, {c_int=int,c_string=string}>"
53+
}
54+
{
55+
name = "c_2"
56+
type = "map<string, {c_int=int,c_string=string,c_row={c_int=int}}>"
57+
}
58+
{
59+
name = "c_3"
60+
type = "map<string, {\"c_int\":\"int\",\"c_string\":\"string\"}>"
61+
}
62+
]
63+
}
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)