Skip to content

Commit 7348be2

Browse files
authored
[Feature][Flink] Support Decimal Type with configurable precision and scale (apache#5419)
1 parent e556170 commit 7348be2

File tree

13 files changed

+141
-29
lines changed

13 files changed

+141
-29
lines changed

release-note.md

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
- [Core] [API] Add copy method to Catalog codes (#4414)
149149
- [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424)
150150
- [Core] [Shade] Add guava shade module (#4358)
151+
- [Core] [Flink] Support Decimal Type with configurable precision and scale (#5419)
151152

152153
### Connector-V2
153154

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.env.EnvCommonOptions;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2324
import org.apache.seatunnel.common.Constants;
2425
import org.apache.seatunnel.common.config.CheckResult;
2526
import org.apache.seatunnel.common.constants.JobMode;
@@ -51,8 +52,11 @@
5152

5253
import java.net.URL;
5354
import java.util.ArrayList;
55+
import java.util.LinkedHashMap;
5456
import java.util.List;
57+
import java.util.Map;
5558
import java.util.Objects;
59+
import java.util.Optional;
5660
import java.util.stream.Collectors;
5761

5862
@Slf4j
@@ -64,7 +68,8 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
6468
private StreamExecutionEnvironment environment;
6569

6670
private StreamTableEnvironment tableEnvironment;
67-
71+
private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
72+
private Optional<SeaTunnelRowType> defaultType = Optional.empty();
6873
private JobMode jobMode;
6974

7075
private String jobName = Constants.LOGO;
@@ -334,6 +339,24 @@ public void registerResultTable(
334339
name, tableEnvironment.fromChangelogStream(dataStream));
335340
}
336341

342+
public void stageType(String tblName, SeaTunnelRowType type) {
343+
stagedTypes.put(tblName, type);
344+
}
345+
346+
public void stageDefaultType(SeaTunnelRowType type) {
347+
this.defaultType = Optional.of(type);
348+
}
349+
350+
public Optional<SeaTunnelRowType> type(String tblName) {
351+
return stagedTypes.containsKey(tblName)
352+
? Optional.of(stagedTypes.get(tblName))
353+
: Optional.empty();
354+
}
355+
356+
public Optional<SeaTunnelRowType> defaultType() {
357+
return this.defaultType;
358+
}
359+
337360
public static FlinkRuntimeEnvironment getInstance(Config config) {
338361
if (INSTANCE == null) {
339362
synchronized (FlinkRuntimeEnvironment.class) {

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
3232
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
3333
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
34-
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
3534

3635
import org.apache.flink.streaming.api.datastream.DataStream;
3736
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -101,8 +100,8 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
101100
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
102101
plugins.get(i);
103102
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
104-
seaTunnelSink.setTypeInfo(
105-
(SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
103+
SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
104+
seaTunnelSink.setTypeInfo(sourceType);
106105
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
107106
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
108107
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java

+32
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.common.JobContext;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2324
import org.apache.seatunnel.common.utils.ReflectionUtils;
2425
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
2526
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
27+
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
2628

2729
import org.apache.flink.streaming.api.datastream.DataStream;
2830
import org.apache.flink.table.api.Table;
@@ -117,6 +119,36 @@ protected void registerAppendStream(Config pluginConfig) {
117119
}
118120
}
119121

122+
protected void stageType(Config pluginConfig, SeaTunnelRowType type) {
123+
if (!flinkRuntimeEnvironment.defaultType().isPresent()) {
124+
flinkRuntimeEnvironment.stageDefaultType(type);
125+
}
126+
127+
if (pluginConfig.hasPath("result_table_name")) {
128+
String tblName = pluginConfig.getString("result_table_name");
129+
flinkRuntimeEnvironment.stageType(tblName, type);
130+
}
131+
}
132+
133+
protected Optional<SeaTunnelRowType> sourceType(Config pluginConfig) {
134+
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
135+
String tblName = pluginConfig.getString(SOURCE_TABLE_NAME);
136+
return flinkRuntimeEnvironment.type(tblName);
137+
} else {
138+
return flinkRuntimeEnvironment.defaultType();
139+
}
140+
}
141+
142+
protected SeaTunnelRowType initSourceType(Config sinkConfig, DataStream<Row> stream) {
143+
SeaTunnelRowType sourceType =
144+
sourceType(sinkConfig)
145+
.orElseGet(
146+
() ->
147+
(SeaTunnelRowType)
148+
TypeConverterUtils.convert(stream.getType()));
149+
return sourceType;
150+
}
151+
120152
protected abstract List<T> initializePlugins(
121153
List<URL> jarPaths, List<? extends Config> pluginConfigs);
122154
}

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java

+25
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.env.EnvCommonOptions;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2324
import org.apache.seatunnel.common.Constants;
2425
import org.apache.seatunnel.common.config.CheckResult;
2526
import org.apache.seatunnel.common.constants.JobMode;
@@ -51,8 +52,11 @@
5152

5253
import java.net.URL;
5354
import java.util.ArrayList;
55+
import java.util.LinkedHashMap;
5456
import java.util.List;
57+
import java.util.Map;
5558
import java.util.Objects;
59+
import java.util.Optional;
5660
import java.util.stream.Collectors;
5761

5862
@Slf4j
@@ -65,6 +69,9 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
6569

6670
private StreamTableEnvironment tableEnvironment;
6771

72+
private Map<String, SeaTunnelRowType> stagedTypes = new LinkedHashMap<>();
73+
private Optional<SeaTunnelRowType> defaultType = Optional.empty();
74+
6875
private JobMode jobMode;
6976

7077
private String jobName = Constants.LOGO;
@@ -334,6 +341,24 @@ public void registerResultTable(
334341
name, tableEnvironment.fromChangelogStream(dataStream));
335342
}
336343

344+
public void stageType(String tblName, SeaTunnelRowType type) {
345+
stagedTypes.put(tblName, type);
346+
}
347+
348+
public void stageDefaultType(SeaTunnelRowType type) {
349+
this.defaultType = Optional.of(type);
350+
}
351+
352+
public Optional<SeaTunnelRowType> type(String tblName) {
353+
return stagedTypes.containsKey(tblName)
354+
? Optional.of(stagedTypes.get(tblName))
355+
: Optional.empty();
356+
}
357+
358+
public Optional<SeaTunnelRowType> defaultType() {
359+
return this.defaultType;
360+
}
361+
337362
public static FlinkRuntimeEnvironment getInstance(Config config) {
338363
if (INSTANCE == null) {
339364
synchronized (FlinkRuntimeEnvironment.class) {

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
3232
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
3333
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
34-
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
3534

3635
import org.apache.flink.streaming.api.datastream.DataStream;
3736
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -102,8 +101,8 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
102101
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
103102
plugins.get(i);
104103
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
105-
seaTunnelSink.setTypeInfo(
106-
(SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
104+
SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream);
105+
seaTunnelSink.setTypeInfo(sourceType);
107106
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
108107
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
109108
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.seatunnel.api.common.JobContext;
2424
import org.apache.seatunnel.api.source.SeaTunnelSource;
2525
import org.apache.seatunnel.api.source.SupportCoordinate;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2627
import org.apache.seatunnel.common.constants.JobMode;
2728
import org.apache.seatunnel.core.starter.enums.PluginType;
2829
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -76,12 +77,15 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
7677
boolean bounded =
7778
internalSource.getBoundedness()
7879
== org.apache.seatunnel.api.source.Boundedness.BOUNDED;
80+
7981
DataStreamSource<Row> sourceStream =
8082
addSource(
8183
executionEnvironment,
8284
sourceFunction,
8385
"SeaTunnel " + internalSource.getClass().getSimpleName(),
8486
bounded);
87+
stageType(pluginConfig, (SeaTunnelRowType) internalSource.getProducedType());
88+
8589
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
8690
int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
8791
sourceStream.setParallelism(parallelism);

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.common.JobContext;
23-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2525
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2626
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
2727
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -97,7 +97,10 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
9797
SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
9898
Config pluginConfig = pluginConfigs.get(i);
9999
DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
100-
input = flinkTransform(transform, stream);
100+
SeaTunnelRowType sourceType = initSourceType(pluginConfig, stream);
101+
transform.setTypeInfo(sourceType);
102+
input = flinkTransform(sourceType, transform, stream);
103+
stageType(pluginConfig, (SeaTunnelRowType) transform.getProducedType());
101104
registerResultTable(pluginConfig, input);
102105
result.add(input);
103106
} catch (Exception e) {
@@ -111,11 +114,10 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
111114
return result;
112115
}
113116

114-
protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {
115-
SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType());
116-
transform.setTypeInfo(seaTunnelDataType);
117+
protected DataStream<Row> flinkTransform(
118+
SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
117119
TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
118-
FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType);
120+
FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
119121
FlinkRowConverter transformOutputRowConverter =
120122
new FlinkRowConverter(transform.getProducedType());
121123
DataStream<Row> output =

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ env {
2727

2828
source {
2929
FakeSource {
30+
row.num = 100000
3031
schema = {
3132
fields {
3233
c_map = "map<string, string>"

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ sink {
4444
row_rules = [
4545
{
4646
rule_type = MAX_ROW
47-
rule_value = 5
47+
rule_value = 100000
4848
}
4949
],
5050
field_rules = [

seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,14 @@ public void convertShortType() {
8383

8484
@Test
8585
public void convertBigDecimalType() {
86-
Assertions.assertEquals(
87-
BasicTypeInfo.BIG_DEC_TYPE_INFO,
88-
TypeConverterUtils.convert(new DecimalType(30, 2)));
86+
/**
87+
* To solve lost precision and scale of {@link
88+
* org.apache.seatunnel.api.table.type.DecimalType}, use {@link
89+
* org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the convert
90+
* result of {@link org.apache.seatunnel.api.table.type.DecimalType} instance.
91+
*/
92+
Assertions.assertEquals(
93+
BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new DecimalType(30, 2)));
8994
}
9095

9196
@Test

seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java

+24
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.translation.flink.serialization;
1919

20+
import org.apache.seatunnel.api.table.type.DecimalType;
2021
import org.apache.seatunnel.api.table.type.MapType;
2122
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,6 +29,8 @@
2829
import org.apache.flink.types.RowKind;
2930

3031
import java.io.IOException;
32+
import java.math.BigDecimal;
33+
import java.math.RoundingMode;
3134
import java.util.HashMap;
3235
import java.util.Map;
3336
import java.util.function.BiFunction;
@@ -68,6 +71,15 @@ private static Object convert(Object field, SeaTunnelDataType<?> dataType) {
6871
case MAP:
6972
return convertMap(
7073
(Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::convert);
74+
75+
/**
76+
* To solve lost precision and scale of {@link
77+
* org.apache.seatunnel.api.table.type.DecimalType}, use {@link java.lang.String} as
78+
* the convert result of {@link java.math.BigDecimal} instance.
79+
*/
80+
case DECIMAL:
81+
BigDecimal decimal = (BigDecimal) field;
82+
return decimal.toString();
7183
default:
7284
return field;
7385
}
@@ -122,6 +134,18 @@ private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
122134
case MAP:
123135
return convertMap(
124136
(Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::reconvert);
137+
138+
/**
139+
* To solve lost precision and scale of {@link
140+
* org.apache.seatunnel.api.table.type.DecimalType}, create {@link
141+
* java.math.BigDecimal} instance from {@link java.lang.String} type field.
142+
*/
143+
case DECIMAL:
144+
DecimalType decimalType = (DecimalType) dataType;
145+
String decimalData = (String) field;
146+
BigDecimal decimal = new BigDecimal(decimalData);
147+
decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP);
148+
return decimal;
125149
default:
126150
return field;
127151
}

0 commit comments

Comments
 (0)