Skip to content

Commit 6315092

Browse files
[Improve][Connector-V2] Refactor the structure of file sink to reduce redundant codes (apache#2555)
1 parent 9d88b62 commit 6315092

18 files changed

+1698
-3
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java

+36
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.config;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
22+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
23+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
24+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
25+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
2026
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
2127
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
2228
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
@@ -27,30 +33,56 @@
2733

2834
public enum FileFormat implements Serializable {
2935
CSV("csv") {
36+
@Override
37+
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
38+
textFileSinkConfig.setFieldDelimiter(",");
39+
return new TextWriteStrategy(textFileSinkConfig);
40+
}
41+
3042
@Override
3143
public ReadStrategy getReadStrategy() {
3244
return new TextReadStrategy();
3345
}
3446
},
3547
TEXT("txt") {
48+
@Override
49+
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
50+
return new TextWriteStrategy(textFileSinkConfig);
51+
}
52+
3653
@Override
3754
public ReadStrategy getReadStrategy() {
3855
return new TextReadStrategy();
3956
}
4057
},
4158
PARQUET("parquet") {
59+
@Override
60+
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
61+
return new ParquetWriteStrategy(textFileSinkConfig);
62+
}
63+
4264
@Override
4365
public ReadStrategy getReadStrategy() {
4466
return new ParquetReadStrategy();
4567
}
4668
},
4769
ORC("orc") {
70+
@Override
71+
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
72+
return new OrcWriteStrategy(textFileSinkConfig);
73+
}
74+
4875
@Override
4976
public ReadStrategy getReadStrategy() {
5077
return new OrcReadStrategy();
5178
}
5279
},
5380
JSON("json") {
81+
@Override
82+
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
83+
return new JsonWriteStrategy(textFileSinkConfig);
84+
}
85+
5486
@Override
5587
public ReadStrategy getReadStrategy() {
5688
return new JsonReadStrategy();
@@ -70,4 +102,8 @@ public String getSuffix() {
70102
public ReadStrategy getReadStrategy() {
71103
return null;
72104
}
105+
106+
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
107+
return null;
108+
}
73109
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink;
19+
20+
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.api.common.SeaTunnelContext;
22+
import org.apache.seatunnel.api.serialization.DefaultSerializer;
23+
import org.apache.seatunnel.api.serialization.Serializer;
24+
import org.apache.seatunnel.api.sink.SeaTunnelSink;
25+
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
26+
import org.apache.seatunnel.api.sink.SinkCommitter;
27+
import org.apache.seatunnel.api.sink.SinkWriter;
28+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
29+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
30+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
31+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
32+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo2;
33+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
34+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter2;
35+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkCommitter2;
36+
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
37+
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
38+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
39+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
40+
41+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
42+
43+
import java.io.IOException;
44+
import java.util.List;
45+
import java.util.Optional;
46+
47+
public abstract class BaseFileSink implements SeaTunnelSink<SeaTunnelRow, FileSinkState2, FileCommitInfo2, FileAggregatedCommitInfo2> {
48+
protected SeaTunnelRowType seaTunnelRowType;
49+
protected Config pluginConfig;
50+
protected HadoopConf hadoopConf;
51+
protected TextFileSinkConfig textFileSinkConfig;
52+
protected WriteStrategy writeStrategy;
53+
protected SeaTunnelContext seaTunnelContext;
54+
protected String jobId;
55+
56+
@Override
57+
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
58+
this.seaTunnelContext = seaTunnelContext;
59+
this.jobId = seaTunnelContext.getJobId();
60+
}
61+
62+
@Override
63+
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
64+
this.seaTunnelRowType = seaTunnelRowType;
65+
this.textFileSinkConfig = new TextFileSinkConfig(pluginConfig, seaTunnelRowType);
66+
this.writeStrategy = WriteStrategyFactory.of(textFileSinkConfig.getFileFormat(), textFileSinkConfig);
67+
this.writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
68+
}
69+
70+
@Override
71+
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
72+
return seaTunnelRowType;
73+
}
74+
75+
@Override
76+
public SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2> restoreWriter(SinkWriter.Context context, List<FileSinkState2> states) throws IOException {
77+
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states);
78+
}
79+
80+
@Override
81+
public Optional<SinkCommitter<FileCommitInfo2>> createCommitter() throws IOException {
82+
return Optional.of(new FileSinkCommitter2());
83+
}
84+
85+
@Override
86+
public Optional<SinkAggregatedCommitter<FileCommitInfo2, FileAggregatedCommitInfo2>> createAggregatedCommitter() throws IOException {
87+
return Optional.of(new FileSinkAggregatedCommitter2());
88+
}
89+
90+
@Override
91+
public SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2> createWriter(SinkWriter.Context context) throws IOException {
92+
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, jobId);
93+
}
94+
95+
@Override
96+
public Optional<Serializer<FileCommitInfo2>> getCommitInfoSerializer() {
97+
return Optional.of(new DefaultSerializer<>());
98+
}
99+
100+
@Override
101+
public Optional<Serializer<FileAggregatedCommitInfo2>> getAggregatedCommitInfoSerializer() {
102+
return Optional.of(new DefaultSerializer<>());
103+
}
104+
105+
@Override
106+
public Optional<Serializer<FileSinkState2>> getWriterStateSerializer() {
107+
return Optional.of(new DefaultSerializer<>());
108+
}
109+
110+
/**
111+
* Use the pluginConfig to do some initialize operation.
112+
*
113+
* @param pluginConfig plugin config.
114+
* @throws PrepareFailException if plugin prepare failed, the {@link PrepareFailException} will throw.
115+
*/
116+
@Override
117+
public void prepare(Config pluginConfig) throws PrepareFailException {
118+
this.pluginConfig = pluginConfig;
119+
}
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink;
19+
20+
import org.apache.seatunnel.api.sink.SinkWriter;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
23+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
24+
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
25+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
26+
27+
import java.io.IOException;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Optional;
31+
32+
public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2> {
33+
private final WriteStrategy writeStrategy;
34+
private final HadoopConf hadoopConf;
35+
private final SinkWriter.Context context;
36+
private final int subTaskIndex;
37+
private final String jobId;
38+
39+
public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId, List<FileSinkState2> fileSinkStates) {
40+
this.writeStrategy = writeStrategy;
41+
this.context = context;
42+
this.hadoopConf = hadoopConf;
43+
this.jobId = jobId;
44+
this.subTaskIndex = context.getIndexOfSubtask();
45+
writeStrategy.init(hadoopConf, jobId, subTaskIndex);
46+
if (!fileSinkStates.isEmpty()) {
47+
List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
48+
transactionIds.forEach(writeStrategy::abortPrepare);
49+
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
50+
}
51+
}
52+
53+
public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId) {
54+
this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList());
55+
}
56+
57+
@Override
58+
public void write(SeaTunnelRow element) throws IOException {
59+
try {
60+
writeStrategy.write(element);
61+
} catch (Exception e) {
62+
throw new RuntimeException("Write data error, please check", e);
63+
}
64+
}
65+
66+
@Override
67+
public Optional<FileCommitInfo2> prepareCommit() throws IOException {
68+
return writeStrategy.prepareCommit();
69+
}
70+
71+
@Override
72+
public void abortPrepare() {
73+
writeStrategy.abortPrepare();
74+
}
75+
76+
@Override
77+
public List<FileSinkState2> snapshotState(long checkpointId) throws IOException {
78+
return writeStrategy.snapshotState(checkpointId);
79+
}
80+
81+
@Override
82+
public void close() throws IOException {
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Data;
22+
23+
import java.io.Serializable;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
@Data
28+
@AllArgsConstructor
29+
public class FileAggregatedCommitInfo2 implements Serializable {
30+
/**
31+
* Storage the commit info in map.
32+
* <p>K is the file path need to be moved to target dir.</p>
33+
* <p>V is the target file path of the data file.</p>
34+
*/
35+
private final Map<String, Map<String, String>> transactionMap;
36+
37+
/**
38+
* Storage the partition information in map.
39+
* <p>K is the partition column's name.</p>
40+
* <p>V is the list of partition column's values.</p>
41+
*/
42+
private final Map<String, List<String>> partitionDirAndValuesMap;
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Data;
22+
23+
import java.io.Serializable;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
@Data
28+
@AllArgsConstructor
29+
public class FileCommitInfo2 implements Serializable {
30+
/**
31+
* Storage the commit info in map.
32+
* <p>K is the file path need to be moved to target dir.</p>
33+
* <p>V is the target file path of the data file.</p>
34+
*/
35+
private final Map<String, String> needMoveFiles;
36+
37+
/**
38+
* Storage the partition information in map.
39+
* <p>K is the partition column's name.</p>
40+
* <p>V is the list of partition column's values.</p>
41+
*/
42+
private final Map<String, List<String>> partitionDirAndValuesMap;
43+
44+
/**
45+
* Storage the transaction directory
46+
*/
47+
private final String transactionDir;
48+
}

0 commit comments

Comments
 (0)