18
18
package org .apache .seatunnel .e2e .connector .hudi ;
19
19
20
20
import org .apache .seatunnel .common .utils .FileUtils ;
21
+ import org .apache .seatunnel .e2e .common .container .seatunnel .SeaTunnelContainer ;
21
22
import org .apache .seatunnel .e2e .common .util .ContainerUtil ;
22
- import org .apache .seatunnel .engine .e2e .SeaTunnelContainer ;
23
23
24
24
import org .apache .hadoop .conf .Configuration ;
25
25
import org .apache .hadoop .fs .LocalFileSystem ;
32
32
import org .junit .jupiter .api .Assertions ;
33
33
import org .junit .jupiter .api .BeforeAll ;
34
34
import org .junit .jupiter .api .Test ;
35
+ import org .junit .jupiter .api .TestInstance ;
35
36
import org .testcontainers .containers .Container ;
36
37
import org .testcontainers .containers .MinIOContainer ;
37
38
38
39
import io .minio .BucketExistsArgs ;
39
- import io .minio .DownloadObjectArgs ;
40
- import io .minio .ListObjectsArgs ;
41
40
import io .minio .MakeBucketArgs ;
42
41
import io .minio .MinioClient ;
43
- import io .minio .Result ;
44
- import io .minio .messages .Item ;
45
42
import lombok .extern .slf4j .Slf4j ;
46
43
47
44
import java .io .IOException ;
51
48
import static org .awaitility .Awaitility .given ;
52
49
53
50
@ Slf4j
51
+ @ TestInstance (TestInstance .Lifecycle .PER_CLASS )
54
52
public class HudiSeatunnelS3MultiTableIT extends SeaTunnelContainer {
55
53
56
54
private static final String MINIO_DOCKER_IMAGE = "minio/minio:RELEASE.2024-06-13T22-53-53Z" ;
@@ -88,7 +86,6 @@ public void startUp() throws Exception {
88
86
89
87
String s3URL = container .getS3URL ();
90
88
91
- // configuringClient
92
89
minioClient =
93
90
MinioClient .builder ()
94
91
.endpoint (s3URL )
@@ -111,11 +108,13 @@ protected String[] buildStartCommand() {
111
108
"wget -P "
112
109
+ SEATUNNEL_HOME
113
110
+ "lib "
111
+ + " --timeout=180 "
114
112
+ AWS_SDK_DOWNLOAD
115
113
+ " &&"
116
114
+ "wget -P "
117
115
+ SEATUNNEL_HOME
118
116
+ "lib "
117
+ + " --timeout=180 "
119
118
+ HADOOP_AWS_DOWNLOAD
120
119
+ " &&"
121
120
+ ContainerUtil .adaptPathForWin (
@@ -132,90 +131,81 @@ public void tearDown() throws Exception {
132
131
}
133
132
}
134
133
134
+ @ Override
135
+ protected boolean isIssueWeAlreadyKnow (String threadName ) {
136
+ return super .isIssueWeAlreadyKnow (threadName )
137
+ // hudi with s3
138
+ || threadName .startsWith ("s3a-transfer" );
139
+ }
140
+
135
141
@ Test
136
142
public void testS3MultiWrite () throws IOException , InterruptedException {
137
143
copyFileToContainer ("/hudi/core-site.xml" , "/tmp/seatunnel/config/core-site.xml" );
138
- Container .ExecResult textWriteResult = executeSeaTunnelJob ("/hudi/s3_fake_to_hudi.conf" );
144
+ Container .ExecResult textWriteResult = executeJob ("/hudi/s3_fake_to_hudi.conf" );
139
145
Assertions .assertEquals (0 , textWriteResult .getExitCode ());
140
146
Configuration configuration = new Configuration ();
141
147
configuration .set ("fs.defaultFS" , LocalFileSystem .DEFAULT_FS );
142
- given ().ignoreExceptions ()
148
+ given ().pollDelay (10 , TimeUnit .SECONDS )
149
+ .pollInterval (1 , TimeUnit .SECONDS )
143
150
.await ()
144
- .atMost (60000 , TimeUnit .MILLISECONDS )
151
+ .atMost (300 , TimeUnit .SECONDS )
145
152
.untilAsserted (
146
153
() -> {
147
154
// copy hudi to local
148
- Path inputPath1 =
149
- downloadNewestCommitFile (DATABASE_1 + "/" + TABLE_NAME_1 + "/" );
150
- Path inputPath2 =
151
- downloadNewestCommitFile (DATABASE_2 + "/" + TABLE_NAME_2 + "/" );
152
- ParquetReader <Group > reader1 =
153
- ParquetReader .builder (new GroupReadSupport (), inputPath1 )
154
- .withConf (configuration )
155
- .build ();
156
- ParquetReader <Group > reader2 =
157
- ParquetReader .builder (new GroupReadSupport (), inputPath2 )
158
- .withConf (configuration )
159
- .build ();
160
-
161
- long rowCount1 = 0 ;
162
- long rowCount2 = 0 ;
163
- // Read data and count rows
164
- while (reader1 .read () != null ) {
165
- rowCount1 ++;
166
- }
167
- // Read data and count rows
168
- while (reader2 .read () != null ) {
169
- rowCount2 ++;
155
+ Path inputPath1 = null ;
156
+ Path inputPath2 = null ;
157
+ try {
158
+ inputPath1 =
159
+ new Path (
160
+ MinIoUtils .downloadNewestCommitFile (
161
+ minioClient ,
162
+ BUCKET ,
163
+ String .format (
164
+ "%s/%s/" , DATABASE_1 , TABLE_NAME_1 ),
165
+ DOWNLOAD_PATH ));
166
+ log .info (
167
+ "download from s3 success, the parquet file is at: {}" ,
168
+ inputPath1 );
169
+ inputPath2 =
170
+ new Path (
171
+ MinIoUtils .downloadNewestCommitFile (
172
+ minioClient ,
173
+ BUCKET ,
174
+ String .format (
175
+ "%s/%s/" , DATABASE_2 , TABLE_NAME_2 ),
176
+ DOWNLOAD_PATH ));
177
+ log .info (
178
+ "download from s3 success, the parquet file is at: {}" ,
179
+ inputPath2 );
180
+ ParquetReader <Group > reader1 =
181
+ ParquetReader .builder (new GroupReadSupport (), inputPath1 )
182
+ .withConf (configuration )
183
+ .build ();
184
+ ParquetReader <Group > reader2 =
185
+ ParquetReader .builder (new GroupReadSupport (), inputPath2 )
186
+ .withConf (configuration )
187
+ .build ();
188
+
189
+ long rowCount1 = 0 ;
190
+ long rowCount2 = 0 ;
191
+ // Read data and count rows
192
+ while (reader1 .read () != null ) {
193
+ rowCount1 ++;
194
+ }
195
+ // Read data and count rows
196
+ while (reader2 .read () != null ) {
197
+ rowCount2 ++;
198
+ }
199
+ Assertions .assertEquals (100 , rowCount1 );
200
+ Assertions .assertEquals (240 , rowCount2 );
201
+ } finally {
202
+ if (inputPath1 != null ) {
203
+ FileUtils .deleteFile (inputPath1 .toUri ().getPath ());
204
+ }
205
+ if (inputPath2 != null ) {
206
+ FileUtils .deleteFile (inputPath2 .toUri ().getPath ());
207
+ }
170
208
}
171
- FileUtils .deleteFile (inputPath1 .toUri ().getPath ());
172
- FileUtils .deleteFile (inputPath2 .toUri ().getPath ());
173
- Assertions .assertEquals (100 , rowCount1 );
174
- Assertions .assertEquals (240 , rowCount2 );
175
209
});
176
210
}
177
-
178
- public Path downloadNewestCommitFile (String pathPrefix ) throws IOException {
179
- Iterable <Result <Item >> listObjects =
180
- minioClient .listObjects (
181
- ListObjectsArgs .builder ().bucket (BUCKET ).prefix (pathPrefix ).build ());
182
- String newestCommitFileabsolutePath = "" ;
183
- String newestCommitFileName = "" ;
184
- long newestCommitTime = 0L ;
185
- for (Result <Item > listObject : listObjects ) {
186
- Item item ;
187
- try {
188
- item = listObject .get ();
189
- } catch (Exception e ) {
190
- throw new IOException ("List minio file error." , e );
191
- }
192
- if (item .isDir () || !item .objectName ().endsWith (".parquet" )) {
193
- continue ;
194
- }
195
- long fileCommitTime =
196
- Long .parseLong (
197
- item .objectName ()
198
- .substring (
199
- item .objectName ().lastIndexOf ("_" ) + 1 ,
200
- item .objectName ().lastIndexOf (".parquet" )));
201
- if (fileCommitTime > newestCommitTime ) {
202
- newestCommitFileabsolutePath = item .objectName ();
203
- newestCommitFileName =
204
- newestCommitFileabsolutePath .substring (
205
- item .objectName ().lastIndexOf ("/" ) + 1 );
206
- newestCommitTime = fileCommitTime ;
207
- }
208
- }
209
- try {
210
- minioClient .downloadObject (
211
- DownloadObjectArgs .builder ()
212
- .bucket (BUCKET )
213
- .object (newestCommitFileabsolutePath )
214
- .filename (DOWNLOAD_PATH + newestCommitFileName )
215
- .build ());
216
- } catch (Exception e ) {
217
- log .error ("Download file from minio error." );
218
- }
219
- return new Path (DOWNLOAD_PATH + newestCommitFileName );
220
- }
221
211
}
0 commit comments