Skip to content

Commit 658643a

Browse files
authored
[Improve][Connector-V2] Improve the paimon source (apache#6887)
1 parent 87c6adc commit 658643a

File tree

21 files changed

+1355
-100
lines changed

21 files changed

+1355
-100
lines changed

docs/en/connector-v2/source/Paimon.md

+110-6
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,30 @@ Read data from Apache Paimon.
1717

1818
## Options
1919

20-
| name | type | required | default value |
21-
|----------------|--------|----------|---------------|
22-
| warehouse | String | Yes | - |
23-
| database | String | Yes | - |
24-
| table | String | Yes | - |
25-
| hdfs_site_path | String | No | - |
20+
| name | type | required | default value |
21+
|-------------------------|--------|----------|---------------|
22+
| warehouse | String | Yes | - |
23+
| catalog_type | String | No | filesystem |
24+
| catalog_uri | String | No | - |
25+
| database | String | Yes | - |
26+
| table | String | Yes | - |
27+
| hdfs_site_path | String | No | - |
28+
| query | String | No | - |
29+
| paimon.hadoop.conf | Map | No | - |
30+
| paimon.hadoop.conf-path | String | No | - |
2631

2732
### warehouse [string]
2833

2934
Paimon warehouse path
3035

36+
### catalog_type [string]
37+
38+
Catalog type of Paimon, support filesystem and hive
39+
40+
### catalog_uri [string]
41+
42+
Catalog uri of Paimon, only needed when catalog_type is hive
43+
3144
### database [string]
3245

3346
The database you want to access
@@ -40,8 +53,39 @@ The table you want to access
4053

4154
The file path of `hdfs-site.xml`
4255

56+
### query [string]
57+
58+
The filter condition of the table read. For example: `select * from st_test where id > 100`. If not specified, all rows are read.
59+
Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, and others are not supported.
60+
The Having, Group By, Order By clauses are currently unsupported, because these clauses are not supported by Paimon.
61+
The projection and limit will be supported in the future.
62+
63+
Note: When the field after the where condition is a string or boolean value, its value must be enclosed in single quotes, otherwise an error will be reported. `For example: name='abc' or tag='true'`
64+
The field data types currently supported by where conditions are as follows:
65+
66+
* string
67+
* boolean
68+
* tinyint
69+
* smallint
70+
* int
71+
* bigint
72+
* float
73+
* double
74+
* date
75+
* timestamp
76+
77+
### paimon.hadoop.conf [string]
78+
79+
Properties in hadoop conf
80+
81+
### paimon.hadoop.conf-path [string]
82+
83+
The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files
84+
4385
## Examples
4486

87+
### Simple example
88+
4589
```hocon
4690
source {
4791
Paimon {
@@ -52,6 +96,66 @@ source {
5296
}
5397
```
5498

99+
### Filter example
100+
101+
```hocon
102+
source {
103+
Paimon {
104+
warehouse = "/tmp/paimon"
105+
database = "full_type"
106+
table = "st_test"
107+
query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
108+
}
109+
}
110+
```
111+
112+
### Hadoop conf example
113+
114+
```hocon
115+
source {
116+
Paimon {
117+
catalog_name="seatunnel_test"
118+
warehouse="hdfs:///tmp/paimon"
119+
database="seatunnel_namespace1"
120+
table="st_test"
121+
query = "select * from st_test where pk_id is not null and pk_id < 3"
122+
paimon.hadoop.conf = {
123+
fs.defaultFS = "hdfs://nameservice1"
124+
dfs.nameservices = "nameservice1"
125+
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
126+
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
127+
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
128+
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
129+
dfs.client.use.datanode.hostname = "true"
130+
}
131+
}
132+
}
133+
```
134+
135+
### Hive catalog example
136+
137+
```hocon
138+
source {
139+
Paimon {
140+
catalog_name="seatunnel_test"
141+
catalog_type="hive"
142+
catalog_uri="thrift://hadoop04:9083"
143+
warehouse="hdfs:///tmp/seatunnel"
144+
database="seatunnel_test"
145+
table="st_test3"
146+
paimon.hadoop.conf = {
147+
fs.defaultFS = "hdfs://nameservice1"
148+
dfs.nameservices = "nameservice1"
149+
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
150+
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
151+
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
152+
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
153+
dfs.client.use.datanode.hostname = "true"
154+
}
155+
}
156+
}
157+
```
158+
55159
## Changelog
56160

57161
### next version

seatunnel-connectors-v2/connector-paimon/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@
9090
</exclusion>
9191
</exclusions>
9292
</dependency>
93+
<dependency>
94+
<groupId>com.github.jsqlparser</groupId>
95+
<artifactId>jsqlparser</artifactId>
96+
<version>${jsqlparser.version}</version>
97+
</dependency>
9398

9499
</dependencies>
95100

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
24+
import lombok.Getter;
25+
26+
@Getter
27+
public class PaimonSourceConfig extends PaimonConfig {
28+
29+
public static final Option<String> QUERY_SQL =
30+
Options.key("query")
31+
.stringType()
32+
.noDefaultValue()
33+
.withDescription("The query of paimon source");
34+
35+
private String query;
36+
37+
public PaimonSourceConfig(ReadonlyConfig readonlyConfig) {
38+
super(readonlyConfig);
39+
this.query = readonlyConfig.get(QUERY_SQL);
40+
}
41+
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java

+38-76
Original file line numberDiff line numberDiff line change
@@ -17,130 +17,92 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2421
import org.apache.seatunnel.api.source.Boundedness;
2522
import org.apache.seatunnel.api.source.SeaTunnelSource;
2623
import org.apache.seatunnel.api.source.SourceReader;
2724
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
28-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
25+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
26+
import org.apache.seatunnel.api.table.catalog.TablePath;
2927
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3028
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
31-
import org.apache.seatunnel.common.config.CheckConfigUtil;
32-
import org.apache.seatunnel.common.config.CheckResult;
33-
import org.apache.seatunnel.common.constants.PluginType;
34-
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
35-
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
36-
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
37-
38-
import org.apache.hadoop.conf.Configuration;
39-
import org.apache.hadoop.fs.Path;
40-
import org.apache.paimon.catalog.Catalog;
41-
import org.apache.paimon.catalog.CatalogContext;
42-
import org.apache.paimon.catalog.CatalogFactory;
43-
import org.apache.paimon.catalog.Identifier;
44-
import org.apache.paimon.options.Options;
45-
import org.apache.paimon.table.Table;
46-
47-
import com.google.auto.service.AutoService;
29+
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
30+
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
31+
import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
4832

49-
import java.util.HashMap;
50-
import java.util.Map;
33+
import org.apache.paimon.predicate.Predicate;
34+
import org.apache.paimon.table.Table;
5135

52-
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
53-
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH;
54-
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
55-
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
36+
import java.util.Collections;
37+
import java.util.List;
5638

5739
/** Paimon connector source class. */
58-
@AutoService(SeaTunnelSource.class)
5940
public class PaimonSource
6041
implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState> {
6142

6243
private static final long serialVersionUID = 1L;
6344

6445
public static final String PLUGIN_NAME = "Paimon";
6546

66-
private Config pluginConfig;
47+
private ReadonlyConfig readonlyConfig;
6748

6849
private SeaTunnelRowType seaTunnelRowType;
6950

70-
private Table table;
51+
private Table paimonTable;
52+
53+
private Predicate predicate;
54+
55+
private CatalogTable catalogTable;
56+
57+
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) {
58+
this.readonlyConfig = readonlyConfig;
59+
PaimonSourceConfig paimonSourceConfig = new PaimonSourceConfig(readonlyConfig);
60+
TablePath tablePath =
61+
TablePath.of(paimonSourceConfig.getNamespace(), paimonSourceConfig.getTable());
62+
this.catalogTable = paimonCatalog.getTable(tablePath);
63+
this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
64+
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
65+
// TODO: We can use this to realize the column projection feature later
66+
String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL);
67+
this.predicate =
68+
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
69+
this.paimonTable.rowType(), filterSql);
70+
}
7171

7272
@Override
7373
public String getPluginName() {
7474
return PLUGIN_NAME;
7575
}
7676

77-
@Override
78-
public void prepare(Config pluginConfig) throws PrepareFailException {
79-
this.pluginConfig = pluginConfig;
80-
final CheckResult result =
81-
CheckConfigUtil.checkAllExists(
82-
pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key());
83-
if (!result.isSuccess()) {
84-
throw new PaimonConnectorException(
85-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
86-
String.format(
87-
"PluginName: %s, PluginType: %s, Message: %s",
88-
getPluginName(), PluginType.SOURCE, result.getMsg()));
89-
}
90-
// initialize paimon table
91-
final String warehouse = pluginConfig.getString(WAREHOUSE.key());
92-
final String database = pluginConfig.getString(DATABASE.key());
93-
final String table = pluginConfig.getString(TABLE.key());
94-
final Map<String, String> optionsMap = new HashMap<>();
95-
optionsMap.put(WAREHOUSE.key(), warehouse);
96-
final Options options = Options.fromMap(optionsMap);
97-
final Configuration hadoopConf = new Configuration();
98-
if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) {
99-
hadoopConf.addResource(new Path(pluginConfig.getString(HDFS_SITE_PATH.key())));
100-
}
101-
final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf);
102-
try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
103-
Identifier identifier = Identifier.create(database, table);
104-
this.table = catalog.getTable(identifier);
105-
} catch (Exception e) {
106-
String errorMsg =
107-
String.format(
108-
"Failed to get table [%s] from database [%s] on warehouse [%s]",
109-
database, table, warehouse);
110-
throw new PaimonConnectorException(
111-
PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
112-
}
113-
// TODO: Support column projection
114-
seaTunnelRowType = RowTypeConverter.convert(this.table.rowType());
115-
}
116-
11777
@Override
11878
public Boundedness getBoundedness() {
11979
return Boundedness.BOUNDED;
12080
}
12181

12282
@Override
123-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
124-
return seaTunnelRowType;
83+
public List<CatalogTable> getProducedCatalogTables() {
84+
return Collections.singletonList(catalogTable);
12585
}
12686

12787
@Override
12888
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
12989
SourceReader.Context readerContext) throws Exception {
130-
return new PaimonSourceReader(readerContext, table, seaTunnelRowType);
90+
91+
return new PaimonSourceReader(readerContext, paimonTable, seaTunnelRowType, predicate);
13192
}
13293

13394
@Override
13495
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> createEnumerator(
13596
SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext) throws Exception {
136-
return new PaimonSourceSplitEnumerator(enumeratorContext, table);
97+
return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable, predicate);
13798
}
13899

139100
@Override
140101
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> restoreEnumerator(
141102
SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext,
142103
PaimonSourceState checkpointState)
143104
throws Exception {
144-
return new PaimonSourceSplitEnumerator(enumeratorContext, table, checkpointState);
105+
return new PaimonSourceSplitEnumerator(
106+
enumeratorContext, paimonTable, checkpointState, predicate);
145107
}
146108
}

0 commit comments

Comments
 (0)