Skip to content

Commit

Permalink
[FLINK-32129][fs-connector] Use string array in PartitionCommitInfo
Browse files Browse the repository at this point in the history
Using generics will throw exception when 'pipeline.generic-types' is disabled

Close apache#22609
  • Loading branch information
FangYongs authored and libenchao committed May 22, 2023
1 parent 9dfde1c commit 3cbacbf
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;

import java.io.Serializable;
import java.util.List;

/**
* The message sent by upstream.
Expand All @@ -37,12 +36,12 @@ public class PartitionCommitInfo implements Serializable {
private long checkpointId;
private int taskId;
private int numberOfTasks;
private List<String> partitions;
private String[] partitions;

public PartitionCommitInfo() {}

public PartitionCommitInfo(
long checkpointId, int taskId, int numberOfTasks, List<String> partitions) {
long checkpointId, int taskId, int numberOfTasks, String[] partitions) {
this.checkpointId = checkpointId;
this.taskId = taskId;
this.numberOfTasks = numberOfTasks;
Expand Down Expand Up @@ -73,11 +72,11 @@ public void setNumberOfTasks(int numberOfTasks) {
this.numberOfTasks = numberOfTasks;
}

public List<String> getPartitions() {
public String[] getPartitions() {
return partitions;
}

public void setPartitions(List<String> partitions) {
public void setPartitions(String[] partitions) {
this.partitions = partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -152,6 +151,6 @@ protected void commitUpToCheckpoint(long checkpointId) throws Exception {
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
new ArrayList<>(partitions))));
partitions.toArray(new String[0]))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void endCompaction(long checkpoint) {
checkpoint,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
new ArrayList<>(this.partitions))));
this.partitions.toArray(new String[0]))));
this.partitions.clear();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.table.stream;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;

/** Tests for partition commit info. */
public class PartitionCommitInfoTest {
@Test
public void testPartitionCommitSerializer() {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableGenericTypes();
assertNotNull(
TypeInformation.of(PartitionCommitInfo.class).createSerializer(executionConfig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -327,7 +328,7 @@ private static RowData row(String s) {
private static List<String> collect(
OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness) {
List<String> parts = new ArrayList<>();
harness.extractOutputValues().forEach(m -> parts.addAll(m.getPartitions()));
harness.extractOutputValues().forEach(m -> parts.addAll(Arrays.asList(m.getPartitions())));
return parts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void testCompactOperator() throws Exception {
List<PartitionCommitInfo> outputs = harness.extractOutputValues();
assertThat(outputs).hasSize(1);
assertThat(outputs.get(0).getCheckpointId()).isEqualTo(1);
assertThat(outputs.get(0).getPartitions()).isEqualTo(Arrays.asList("p0", "p1"));
assertThat(outputs.get(0).getPartitions()).isEqualTo(new String[] {"p0", "p1"});

// check all compacted file generated
assertThat(fs.exists(new Path(folder, "compacted-f0"))).isTrue();
Expand Down

0 comments on commit 3cbacbf

Please sign in to comment.