Skip to content

Commit 4a2e272

Browse files
authored
[Feature][Connector-v2] Support streaming read for paimon (apache#7681)
1 parent 4f5d27f commit 4a2e272

File tree

16 files changed

+883
-237
lines changed

16 files changed

+883
-237
lines changed

docs/en/connector-v2/source/Paimon.md

+25-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Read data from Apache Paimon.
99
## Key features
1010

1111
- [x] [batch](../../concept/connector-v2-features.md)
12-
- [ ] [stream](../../concept/connector-v2-features.md)
12+
- [x] [stream](../../concept/connector-v2-features.md)
1313
- [ ] [exactly-once](../../concept/connector-v2-features.md)
1414
- [ ] [column projection](../../concept/connector-v2-features.md)
1515
- [ ] [parallelism](../../concept/connector-v2-features.md)
@@ -157,9 +157,30 @@ source {
157157
```
158158

159159
## Changelog
160+
If you want to read the changelog of this connector, your sink table of paimon which mast has the options named `changelog-producer=input`, then you can refer to [Paimon changelog](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/).
161+
Currently, we only support the `input` and `none` mode of changelog producer. If the changelog producer is `input`, the streaming read of the connector will generate -U,+U,+I,+D data. But if the changelog producer is `none`, the streaming read of the connector will generate +I,+U,+D data.
160162

161-
### next version
163+
### Streaming read example
164+
```hocon
165+
env {
166+
parallelism = 1
167+
job.mode = "Streaming"
168+
}
162169
163-
- Add Paimon Source Connector
164-
- Support projection for Paimon Source
170+
source {
171+
Paimon {
172+
warehouse = "/tmp/paimon"
173+
database = "full_type"
174+
table = "st_test"
175+
}
176+
}
165177
178+
sink {
179+
Paimon {
180+
warehouse = "/tmp/paimon"
181+
database = "full_type"
182+
table = "st_test_sink"
183+
paimon.table.primary-keys = "c_tinyint"
184+
}
185+
}
186+
```

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java

+46-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
1919

20+
import org.apache.seatunnel.api.common.JobContext;
2021
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2122
import org.apache.seatunnel.api.source.Boundedness;
2223
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -26,18 +27,23 @@
2627
import org.apache.seatunnel.api.table.catalog.TablePath;
2728
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2829
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
30+
import org.apache.seatunnel.common.constants.JobMode;
2931
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
3032
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
3133
import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
34+
import org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonBatchSourceSplitEnumerator;
35+
import org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonStreamSourceSplitEnumerator;
3236
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
3337

3438
import org.apache.paimon.predicate.Predicate;
3539
import org.apache.paimon.table.Table;
40+
import org.apache.paimon.table.source.ReadBuilder;
3641
import org.apache.paimon.types.RowType;
3742

3843
import net.sf.jsqlparser.statement.select.PlainSelect;
3944

4045
import java.util.Collections;
46+
import java.util.LinkedList;
4147
import java.util.List;
4248
import java.util.Objects;
4349

@@ -58,12 +64,12 @@ public class PaimonSource
5864

5965
private Table paimonTable;
6066

61-
private Predicate predicate;
62-
63-
private int[] projectionIndex;
67+
private JobContext jobContext;
6468

6569
private CatalogTable catalogTable;
6670

71+
protected final ReadBuilder readBuilder;
72+
6773
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) {
6874
this.readonlyConfig = readonlyConfig;
6975
PaimonSourceConfig paimonSourceConfig = new PaimonSourceConfig(readonlyConfig);
@@ -76,17 +82,22 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog)
7682
PlainSelect plainSelect = convertToPlainSelect(filterSql);
7783
RowType paimonRowType = this.paimonTable.rowType();
7884
String[] filedNames = paimonRowType.getFieldNames().toArray(new String[0]);
85+
86+
Predicate predicate = null;
87+
int[] projectionIndex = null;
7988
if (!Objects.isNull(plainSelect)) {
80-
this.projectionIndex = convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
89+
projectionIndex = convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
8190
if (!Objects.isNull(projectionIndex)) {
8291
this.catalogTable =
8392
paimonCatalog.getTableWithProjection(tablePath, projectionIndex);
8493
}
85-
this.predicate =
94+
predicate =
8695
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
8796
paimonRowType, plainSelect);
8897
}
89-
seaTunnelRowType = RowTypeConverter.convert(paimonRowType, projectionIndex);
98+
this.seaTunnelRowType = RowTypeConverter.convert(paimonRowType, projectionIndex);
99+
this.readBuilder =
100+
paimonTable.newReadBuilder().withProjection(projectionIndex).withFilter(predicate);
90101
}
91102

92103
@Override
@@ -99,31 +110,54 @@ public List<CatalogTable> getProducedCatalogTables() {
99110
return Collections.singletonList(catalogTable);
100111
}
101112

113+
@Override
114+
public void setJobContext(JobContext jobContext) {
115+
this.jobContext = jobContext;
116+
}
117+
102118
@Override
103119
public Boundedness getBoundedness() {
104-
return Boundedness.BOUNDED;
120+
return JobMode.BATCH.equals(jobContext.getJobMode())
121+
? Boundedness.BOUNDED
122+
: Boundedness.UNBOUNDED;
105123
}
106124

107125
@Override
108126
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
109127
SourceReader.Context readerContext) throws Exception {
110128
return new PaimonSourceReader(
111-
readerContext, paimonTable, seaTunnelRowType, predicate, projectionIndex);
129+
readerContext, paimonTable, seaTunnelRowType, readBuilder.newRead());
112130
}
113131

114132
@Override
115133
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> createEnumerator(
116134
SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext) throws Exception {
117-
return new PaimonSourceSplitEnumerator(
118-
enumeratorContext, paimonTable, predicate, projectionIndex);
135+
if (getBoundedness() == Boundedness.BOUNDED) {
136+
return new PaimonBatchSourceSplitEnumerator(
137+
enumeratorContext, new LinkedList<>(), null, readBuilder.newScan(), 1);
138+
}
139+
return new PaimonStreamSourceSplitEnumerator(
140+
enumeratorContext, new LinkedList<>(), null, readBuilder.newStreamScan(), 1);
119141
}
120142

121143
@Override
122144
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> restoreEnumerator(
123145
SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext,
124146
PaimonSourceState checkpointState)
125147
throws Exception {
126-
return new PaimonSourceSplitEnumerator(
127-
enumeratorContext, paimonTable, checkpointState, predicate, projectionIndex);
148+
if (getBoundedness() == Boundedness.BOUNDED) {
149+
return new PaimonBatchSourceSplitEnumerator(
150+
enumeratorContext,
151+
checkpointState.getAssignedSplits(),
152+
checkpointState.getCurrentSnapshotId(),
153+
readBuilder.newScan(),
154+
1);
155+
}
156+
return new PaimonStreamSourceSplitEnumerator(
157+
enumeratorContext,
158+
checkpointState.getAssignedSplits(),
159+
checkpointState.getCurrentSnapshotId(),
160+
readBuilder.newStreamScan(),
161+
1);
128162
}
129163
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java

+27-20
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
1919

20+
import org.apache.seatunnel.api.source.Boundedness;
2021
import org.apache.seatunnel.api.source.Collector;
2122
import org.apache.seatunnel.api.source.SourceReader;
23+
import org.apache.seatunnel.api.table.type.RowKind;
2224
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2325
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2426
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
27+
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowKindConverter;
2528

2629
import org.apache.paimon.data.InternalRow;
27-
import org.apache.paimon.predicate.Predicate;
2830
import org.apache.paimon.reader.RecordReader;
2931
import org.apache.paimon.reader.RecordReaderIterator;
3032
import org.apache.paimon.table.FileStoreTable;
3133
import org.apache.paimon.table.Table;
34+
import org.apache.paimon.table.source.TableRead;
3235

3336
import lombok.extern.slf4j.Slf4j;
3437

@@ -48,20 +51,14 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour
4851
private final Table table;
4952
private final SeaTunnelRowType seaTunnelRowType;
5053
private volatile boolean noMoreSplit;
51-
private final Predicate predicate;
52-
private int[] projection;
54+
private final TableRead tableRead;
5355

5456
public PaimonSourceReader(
55-
Context context,
56-
Table table,
57-
SeaTunnelRowType seaTunnelRowType,
58-
Predicate predicate,
59-
int[] projection) {
57+
Context context, Table table, SeaTunnelRowType seaTunnelRowType, TableRead tableRead) {
6058
this.context = context;
6159
this.table = table;
6260
this.seaTunnelRowType = seaTunnelRowType;
63-
this.predicate = predicate;
64-
this.projection = projection;
61+
this.tableRead = tableRead;
6562
}
6663

6764
@Override
@@ -81,29 +78,39 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
8178
if (Objects.nonNull(split)) {
8279
// read logic
8380
try (final RecordReader<InternalRow> reader =
84-
table.newReadBuilder()
85-
.withProjection(projection)
86-
.withFilter(predicate)
87-
.newRead()
88-
.executeFilter()
89-
.createReader(split.getSplit())) {
81+
tableRead.executeFilter().createReader(split.getSplit())) {
9082
final RecordReaderIterator<InternalRow> rowIterator =
9183
new RecordReaderIterator<>(reader);
9284
while (rowIterator.hasNext()) {
9385
final InternalRow row = rowIterator.next();
9486
final SeaTunnelRow seaTunnelRow =
9587
RowConverter.convert(
9688
row, seaTunnelRowType, ((FileStoreTable) table).schema());
89+
if (Boundedness.UNBOUNDED.equals(context.getBoundedness())) {
90+
RowKind rowKind =
91+
RowKindConverter.convertPaimonRowKind2SeatunnelRowkind(
92+
row.getRowKind());
93+
if (rowKind != null) {
94+
seaTunnelRow.setRowKind(rowKind);
95+
}
96+
}
9797
output.collect(seaTunnelRow);
9898
}
9999
}
100-
} else if (noMoreSplit && sourceSplits.isEmpty()) {
100+
}
101+
102+
if (noMoreSplit
103+
&& sourceSplits.isEmpty()
104+
&& Boundedness.BOUNDED.equals(context.getBoundedness())) {
101105
// signal to the source that we have reached the end of the data.
102-
log.info("Closed the bounded flink table store source");
106+
log.info("Closed the bounded table store source");
103107
context.signalNoMoreElement();
104108
} else {
105-
log.warn("Waiting for flink table source split, sleeping 1s");
106-
Thread.sleep(1000L);
109+
context.sendSplitRequest();
110+
if (sourceSplits.isEmpty()) {
111+
log.debug("Waiting for table source split, sleeping 1s");
112+
Thread.sleep(1000L);
113+
}
107114
}
108115
}
109116
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@
2121

2222
import org.apache.paimon.table.source.Split;
2323

24+
import lombok.AllArgsConstructor;
25+
import lombok.Getter;
26+
2427
/** Paimon source split, wrapped the {@link Split} of paimon table. */
28+
@Getter
29+
@AllArgsConstructor
2530
public class PaimonSourceSplit implements SourceSplit {
2631
private static final long serialVersionUID = 1L;
2732

28-
private final Split split;
33+
/** The unique ID of the split. Unique within the scope of this source. */
34+
private final String id;
2935

30-
public PaimonSourceSplit(Split split) {
31-
this.split = split;
32-
}
36+
private final Split split;
3337

3438
@Override
3539
public String splitId() {
3640
return split.toString();
3741
}
38-
39-
public Split getSplit() {
40-
return split;
41-
}
4242
}

0 commit comments

Comments
 (0)