Skip to content

Commit f73b372

Browse files
zck573693104zckEricJoy2048
authored
[Feature] [File Connector] Supports writing column names when the output type is file (CSV) (apache#5459)
* [Feature] [File Connector] Supports writing column names when the output type is file (CSV) apache#5443 * [Feature] [File Connector] fix code style and lineSeparator apache#5443 * [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. apache#5443 * [Feature] [File Connector] fix code style apache#5443 * [Feature] [File Connector] add enable_header_write explain apache#5443 * [Feature] [File Connector]fix code style apache#5443 * Update docs/en/connector-v2/sink/LocalFile.md * Update docs/en/connector-v2/sink/LocalFile.md * [Feature] [File Connector]fix code style apache#5443 * [Feature] [File Connector]add junit test apache#5443 * [Feature] [File Connector]add license header: apache#5443 * [Feature] [File Connector] Supports writing column names when the output type is file (CSV) apache#5443 * [Feature] [File Connector] fix code style and lineSeparator apache#5443 * [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. apache#5443 * [Feature] [File Connector] fix code style apache#5443 * [Feature] [File Connector] add enable_header_write explain apache#5443 * [Feature] [File Connector]fix code style apache#5443 * Update docs/en/connector-v2/sink/LocalFile.md * Update docs/en/connector-v2/sink/LocalFile.md * [Feature] [File Connector]fix code style apache#5443 * [Feature] [File Connector]add junit test apache#5443 * [Feature] [File Connector]add license header: apache#5443 * [Feature] [File Connector]add junit: apache#5443 * [Feature] [File Connector]add junit: apache#5443 * [Feature] [File Connector]remove scala: apache#5443 * [Feature] [File Connector]modify md style: apache#5443 * [Feature] [File Connector] Supports writing column names when the output type is file (CSV) apache#5443 * [Feature] [File Connector] fix code style and lineSeparator apache#5443 * [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. apache#5443 * [Feature] [File Connector] fix code style apache#5443 * [Feature] [File Connector] add enable_header_write explain apache#5443 * [Feature] [File Connector]fix code style apache#5443 * Update docs/en/connector-v2/sink/LocalFile.md * Update docs/en/connector-v2/sink/LocalFile.md * [Feature] [File Connector]fix code style apache#5443 * [Feature] [File Connector]add junit test apache#5443 * [Feature] [File Connector]add license header: apache#5443 * [Feature] [File Connector]add junit: apache#5443 * [Feature] [File Connector]add junit: apache#5443 * [Feature] [File Connector]remove scala: apache#5443 * [Feature] [File Connector]modify md style: apache#5443 * [Feature] [File Connector]junit modify: apache#5443 --------- Co-authored-by: zck <[email protected]> Co-authored-by: Eric <[email protected]>
1 parent 7348be2 commit f73b372

File tree

6 files changed

+296
-20
lines changed

6 files changed

+296
-20
lines changed

docs/en/connector-v2/sink/LocalFile.md

+25-20
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once`
3030

3131
## Options
3232

33-
| name | type | required | default value | remarks |
34-
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
35-
| path | string | yes | - | |
36-
| custom_filename | boolean | no | false | Whether you need custom the filename |
37-
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
38-
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
39-
| file_format_type | string | no | "csv" | |
40-
| field_delimiter | string | no | '\001' | Only used when file_format_type is text |
41-
| row_delimiter | string | no | "\n" | Only used when file_format_type is text |
42-
| have_partition | boolean | no | false | Whether you need processing partitions. |
43-
| partition_by | array | no | - | Only used then have_partition is true |
44-
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
45-
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
46-
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
47-
| is_enable_transaction | boolean | no | true | |
48-
| batch_size | int | no | 1000000 | |
49-
| compress_codec | string | no | none | |
50-
| common-options | object | no | - | |
51-
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
52-
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
33+
| name | type | required | default value | remarks |
34+
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------|
35+
| path | string | yes | - | |
36+
| custom_filename | boolean | no | false | Whether you need custom the filename |
37+
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
38+
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
39+
| file_format_type | string | no | "csv" | |
40+
| field_delimiter | string | no | '\001' | Only used when file_format_type is text |
41+
| row_delimiter | string | no | "\n" | Only used when file_format_type is text |
42+
| have_partition | boolean | no | false | Whether you need processing partitions. |
43+
| partition_by | array | no | - | Only used then have_partition is true |
44+
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
45+
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
46+
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
47+
| is_enable_transaction | boolean | no | true | |
48+
| batch_size | int | no | 1000000 | |
49+
| compress_codec | string | no | none | |
50+
| common-options | object | no | - | |
51+
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
52+
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
53+
| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.<br/> false:don't write header,true:write header. |
5354

5455
### path [string]
5556

@@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in
166167

167168
Writer the sheet of the workbook
168169

170+
### enable_header_write [boolean]
171+
172+
Only used when file_format_type is text,csv.false:don't write header,true:write header.
173+
169174
## Example
170175

171176
For orc file format simple config

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
4646
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
4747
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
4848
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
49+
protected Boolean enableHeaderWriter = false;
4950

5051
public BaseFileSinkConfig(@NonNull Config config) {
5152
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
@@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) {
99100
timeFormat =
100101
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
101102
}
103+
104+
if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
105+
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
106+
}
102107
}
103108

104109
public BaseFileSinkConfig() {}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,10 @@ public class BaseSinkConfig {
232232
.stringType()
233233
.noDefaultValue()
234234
.withDescription("To be written sheet name,only valid for excel files");
235+
236+
public static final Option<Boolean> ENABLE_HEADER_WRITE =
237+
Options.key("enable_header_write")
238+
.booleanType()
239+
.defaultValue(false)
240+
.withDescription("false:dont write header,true:write header");
235241
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java

+19
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.common.utils.DateTimeUtils;
2525
import org.apache.seatunnel.common.utils.DateUtils;
2626
import org.apache.seatunnel.common.utils.TimeUtils;
27+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2728
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
2829
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
2930
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
4748
private final DateUtils.Formatter dateFormat;
4849
private final DateTimeUtils.Formatter dateTimeFormat;
4950
private final TimeUtils.Formatter timeFormat;
51+
private final FileFormat fileFormat;
52+
private final Boolean enableHeaderWriter;
5053
private SerializationSchema serializationSchema;
5154

5255
public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
5861
this.dateFormat = fileSinkConfig.getDateFormat();
5962
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
6063
this.timeFormat = fileSinkConfig.getTimeFormat();
64+
this.fileFormat = fileSinkConfig.getFileFormat();
65+
this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
6166
}
6267

6368
@Override
@@ -133,15 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
133138
OutputStream out =
134139
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
135140
fsDataOutputStream = new FSDataOutputStream(out, null);
141+
enableWriteHeader(fsDataOutputStream);
136142
break;
137143
case NONE:
138144
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
145+
enableWriteHeader(fsDataOutputStream);
139146
break;
140147
default:
141148
log.warn(
142149
"Text file does not support this compress type: {}",
143150
compressFormat.getCompressCodec());
144151
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
152+
enableWriteHeader(fsDataOutputStream);
145153
break;
146154
}
147155
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
@@ -155,4 +163,15 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
155163
}
156164
return fsDataOutputStream;
157165
}
166+
167+
private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException {
168+
if (enableHeaderWriter) {
169+
fsDataOutputStream.write(
170+
String.join(
171+
FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter,
172+
seaTunnelRowType.getFieldNames())
173+
.getBytes());
174+
fsDataOutputStream.write(rowDelimiter.getBytes());
175+
}
176+
}
158177
}

0 commit comments

Comments
 (0)