diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java index 3b406534c28a7..1a157699d4fb7 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java @@ -176,7 +176,6 @@ private void appendCompactingResultsToSummary(CommittableSummary> results = assertThat(harness.extractOutputValues()).hasSize(4); - results.element(0, as(committableSummary())).hasPendingCommittables(3); results.element(1, as(committableWithLineage())) .hasCommittable(committable("0", "compacted-0", 10)); results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".0")); @@ -140,7 +139,6 @@ void testPassthrough() throws Exception { ListAssert> messages = assertThat(harness.extractOutputValues()).hasSize(3); - messages.element(0, as(committableSummary())).hasPendingCommittables(2); messages.element(1, as(committableWithLineage())) .hasCommittable(cleanupInprogressRequest); messages.element(2, as(committableWithLineage())).hasCommittable(cleanupPathRequest); @@ -207,13 +205,13 @@ void testRestore() throws Exception { // 1summary+1compacted+2cleanup * 2 ListAssert> results = assertThat(harness.extractOutputValues()).hasSize(8); - results.element(0, as(committableSummary())).hasPendingCommittables(3); + results.element(0, as(committableSummary())); results.element(1, as(committableWithLineage())) .hasCommittable(committable("0", "compacted-0", 10)); results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".0")); results.element(3, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".1")); - results.element(4, as(committableSummary())).hasPendingCommittables(3); + results.element(4, as(committableSummary())); results.element(5, as(committableWithLineage())) .hasCommittable(committable("0", "compacted-2", 10)); results.element(6, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".2")); @@ -311,7 +309,7 @@ void testStateHandler() throws Exception { // + 1 summary + 1 cleanup + 1 summary ListAssert> results = assertThat(harness.extractOutputValues()).hasSize(18); - results.element(0, as(committableSummary())).hasPendingCommittables(14); + results.element(0, as(committableSummary())); List expectedResult = Arrays.asList( @@ -335,11 +333,11 @@ void testStateHandler() throws Exception { .hasCommittable(expectedResult.get(i)); } - results.element(15, as(committableSummary())).hasPendingCommittables(1); + results.element(15, as(committableSummary())); results.element(16, as(committableWithLineage())) .hasCommittable(cleanupPath("0", ".6")); - results.element(17, as(committableSummary())).hasPendingCommittables(3); + results.element(17, as(committableSummary())); } } @@ -378,7 +376,7 @@ void testStateHandlerRestore() throws Exception { ListAssert> results = assertThat(harness.extractOutputValues()).hasSize(3); - results.element(0, as(committableSummary())).hasPendingCommittables(4); + results.element(0, as(committableSummary())); results.element(1, as(committableWithLineage())) .hasCommittable(committable("0", "compacted-1", 1)); results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".1")); @@ -437,7 +435,7 @@ void testStateHandlerRestore() throws Exception { ListAssert> results = assertThat(harness.extractOutputValues()).hasSize(2); - results.element(0, as(committableSummary())).hasPendingCommittables(1); + results.element(0, as(committableSummary())); results.element(1, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".2")); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java index 7f1fb0038060a..66af405965bbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java @@ -39,11 +39,29 @@ public class CommittableSummary implements CommittableMessage { private final long checkpointId; /** The number of committables coming from the given subtask in the particular checkpoint. */ private final int numberOfCommittables; + + @Deprecated /** The number of committables that have not been successfully committed. */ private final int numberOfPendingCommittables; /** The number of committables that are not retried and have been failed. */ private final int numberOfFailedCommittables; + public CommittableSummary( + int subtaskId, + int numberOfSubtasks, + long checkpointId, + int numberOfCommittables, + int numberOfFailedCommittables) { + this( + subtaskId, + numberOfSubtasks, + checkpointId, + numberOfCommittables, + 0, + numberOfFailedCommittables); + } + + @Deprecated public CommittableSummary( int subtaskId, int numberOfSubtasks, @@ -75,8 +93,9 @@ public int getNumberOfCommittables() { return numberOfCommittables; } + @Deprecated public int getNumberOfPendingCommittables() { - return numberOfPendingCommittables; + return 0; } public int getNumberOfFailedCommittables() { @@ -89,7 +108,6 @@ public CommittableSummary map() { numberOfSubtasks, checkpointId, numberOfCommittables, - numberOfPendingCommittables, numberOfFailedCommittables); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index b6c93e33089b9..3a2c221a5eaee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -196,8 +196,7 @@ private void emit(CheckpointCommittableManager committableManager) { numberOfSubtasks, checkpointId, committables.size(), - 0, - 0))); + committableManager.getNumFailed()))); for (CommT committable : committables) { output.collect( new StreamRecord<>( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java index 4e34fbbe698c4..74d7b6250496a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java @@ -66,5 +66,24 @@ public interface CheckpointCommittableManager { void commit(Committer committer, int maxRetries) throws IOException, InterruptedException; + /** + * Returns the number of committables that have been successfully committed; that is, the + * corresponding {@link org.apache.flink.api.connector.sink2.Committer.CommitRequest} was not + * used to signal an error of any kind (retryable or not). + * + * @return number of successful committables + */ Collection getSuccessfulCommittables(); + + /** + * Returns the number of committables that have failed with a known error. By the current + * semantics of {@link + * org.apache.flink.api.connector.sink2.Committer.CommitRequest#signalFailedWithKnownReason(Throwable)} + * discards the committable but proceeds processing. The returned number should be emitted + * downstream in a {@link org.apache.flink.streaming.api.connector.sink2.CommittableSummary}, + * such that downstream can assess if all committables have been processed. + * + * @return number of failed committables + */ + int getNumFailed(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 16a4a35bb4f56..6aa7401a00a42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -166,6 +166,13 @@ public Collection getSuccessfulCommittables() { .collect(Collectors.toList()); } + @Override + public int getNumFailed() { + return subtasksCommittableManagers.values().stream() + .mapToInt(SubtaskCommittableManager::getNumFailed) + .sum(); + } + Stream> getPendingRequests() { return subtasksCommittableManagers.values().stream() .peek(this::assertReceivedAll) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java index f41350bd25866..7a9121b0b650e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java @@ -207,7 +207,7 @@ public SubtaskSimpleVersionedSerializer() { @Override public int getVersion() { - return 1; + return 2; } @Override @@ -219,7 +219,6 @@ public byte[] serialize(SubtaskCommittableManager subtask) throws IOExcep new ArrayList<>(subtask.getRequests()), out); out.writeInt(subtask.getNumCommittables()); - out.writeInt(subtask.getNumDrained()); out.writeInt(subtask.getNumFailed()); return out.getCopyOfBuffer(); } @@ -236,7 +235,7 @@ public SubtaskCommittableManager deserialize(int version, byte[] serializ return new SubtaskCommittableManager<>( requests, in.readInt(), - in.readInt(), + version >= 2 ? 0 : in.readInt(), in.readInt(), subtaskId, checkNotNull( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java index 759f98e2bb87f..ffda5913465fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java @@ -25,12 +25,9 @@ import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Deque; -import java.util.Iterator; -import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,7 +43,7 @@ class SubtaskCommittableManager { private final int numExpectedCommittables; private final long checkpointId; private final int subtaskId; - private int numDrained; + @Deprecated private int numDrained; private int numFailed; private final SinkCommitterMetricGroup metricGroup; @@ -65,6 +62,17 @@ class SubtaskCommittableManager { metricGroup); } + SubtaskCommittableManager( + Collection> requests, + int numExpectedCommittables, + int numFailed, + int subtaskId, + long checkpointId, + SinkCommitterMetricGroup metricGroup) { + this(requests, numExpectedCommittables, 0, numFailed, subtaskId, checkpointId, metricGroup); + } + + @Deprecated SubtaskCommittableManager( Collection> requests, int numExpectedCommittables, @@ -98,7 +106,7 @@ void add(CommT committable) { * @return if all committables have been received */ boolean hasReceivedAll() { - return getNumCommittables() == numExpectedCommittables; + return getNumCommittables() == numExpectedCommittables + numFailed; } /** @@ -107,18 +115,7 @@ boolean hasReceivedAll() { * @return number of so far received committables */ int getNumCommittables() { - return requests.size() + numDrained + numFailed; - } - - /** - * Returns the number of still expected commits. - * - *

Either the committables are not yet received or the commit is still pending. - * - * @return number of still expected committables - */ - int getNumPending() { - return numExpectedCommittables - (numDrained + numFailed); + return requests.size(); } int getNumFailed() { @@ -126,7 +123,7 @@ int getNumFailed() { } boolean isFinished() { - return getNumPending() == 0; + return getPendingRequests().findAny().isEmpty(); } /** @@ -145,43 +142,6 @@ Stream getSuccessfulCommittables() { .map(CommitRequestImpl::getCommittable); } - /** - * Iterates through all currently registered {@link #requests} and returns all {@link - * CommittableWithLineage} that could be successfully committed. - * - *

Invoking this method does not yield the same {@link CommittableWithLineage} again. Once - * retrieved they are not part of {@link #requests} anymore. - * - * @return list of {@link CommittableWithLineage} - */ - List> drainCommitted() { - List> committed = new ArrayList<>(requests.size()); - for (Iterator> iterator = requests.iterator(); - iterator.hasNext(); ) { - CommitRequestImpl request = iterator.next(); - if (!request.isFinished()) { - continue; - } - if (request.getState() == CommitRequestState.FAILED) { - numFailed += 1; - iterator.remove(); - continue; - } else { - committed.add( - new CommittableWithLineage<>( - request.getCommittable(), checkpointId, subtaskId)); - } - iterator.remove(); - } - - numDrained += committed.size(); - return committed; - } - - int getNumDrained() { - return numDrained; - } - int getSubtaskId() { return subtaskId; } @@ -202,7 +162,6 @@ SubtaskCommittableManager merge(SubtaskCommittableManager other) { Stream.concat(requests.stream(), other.requests.stream()) .collect(Collectors.toList()), numExpectedCommittables + other.numExpectedCommittables, - numDrained + other.numDrained, numFailed + other.numFailed, subtaskId, checkpointId, @@ -213,7 +172,6 @@ SubtaskCommittableManager copy() { return new SubtaskCommittableManager<>( requests.stream().map(CommitRequestImpl::copy).collect(Collectors.toList()), numExpectedCommittables, - numDrained, numFailed, subtaskId, checkpointId, @@ -254,8 +212,6 @@ public String toString() { + checkpointId + ", subtaskId=" + subtaskId - + ", numDrained=" - + numDrained + ", numFailed=" + numFailed + '}'; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java index 4c7b0d7096cf2..0f84c1c12008f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java @@ -47,7 +47,7 @@ void testCommittableWithLinageSerDe() throws IOException { @Test void testCommittableSummarySerDe() throws IOException { final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, 3L, 4, 5, 6); + new CommittableSummary<>(1, 2, 3L, 4, 5); final CommittableMessage message = SERIALIZER.deserialize( CommittableMessageSerializer.VERSION, @@ -58,7 +58,5 @@ void testCommittableSummarySerDe() throws IOException { assertThat(copy.getNumberOfSubtasks()).isEqualTo(2); assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(3L); assertThat(copy.getNumberOfCommittables()).isEqualTo(4); - assertThat(copy.getNumberOfPendingCommittables()).isEqualTo(5); - assertThat(copy.getNumberOfFailedCommittables()).isEqualTo(6); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java index 14416da2d99a1..2e22b445e4bed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java @@ -40,10 +40,6 @@ public CommittableSummaryAssert hasOverallCommittables(int committableNum return returns(committableNumber, CommittableSummary::getNumberOfCommittables); } - public CommittableSummaryAssert hasPendingCommittables(int committableNumber) { - return returns(committableNumber, CommittableSummary::getNumberOfPendingCommittables); - } - public CommittableSummaryAssert hasFailedCommittables(int committableNumber) { return returns(committableNumber, CommittableSummary::getNumberOfFailedCommittables); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java index 641a651e2e406..5dc646240fcd4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java @@ -48,7 +48,7 @@ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting(boolean commitOnI long cid = 1L; testHarness.processElement( - new StreamRecord<>(new CommittableSummary<>(1, 1, cid, 2, 0, 0))); + new StreamRecord<>(new CommittableSummary<>(1, 1, cid, 2, 0))); testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(1, cid, 1))); @@ -63,7 +63,6 @@ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting(boolean commitOnI if (commitOnInput) { assertThat(committer.committed).containsExactly(1, 2); } else { - // 3PC behavior assertThat(committer.committed).isEmpty(); testHarness.notifyOfCompletedCheckpoint(cid + 1); assertThat(committer.committed).containsExactly(1, 2); @@ -83,7 +82,7 @@ void testWaitForNotifyCheckpointCompleted(boolean commitOnInput) throws Exceptio long cid = 1L; testHarness.processElement( - new StreamRecord<>(new CommittableSummary<>(1, 1, cid, 2, 0, 0))); + new StreamRecord<>(new CommittableSummary<>(1, 1, cid, 2, 0))); testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(1, cid, 1))); @@ -116,7 +115,7 @@ void testStateRestore() throws Exception { testHarness.open(); final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 0L, 1, 1, 0); + new CommittableSummary<>(1, 1, 0L, 1, 1); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableWithLineage first = new CommittableWithLineage<>(1, 0L, 1); testHarness.processElement(new StreamRecord<>(first)); @@ -147,10 +146,10 @@ void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Ex testHarness.open(); final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); + new CommittableSummary<>(1, 2, EOI, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableSummary committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); + new CommittableSummary<>(2, 2, EOI, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary2)); final CommittableWithLineage first = new CommittableWithLineage<>(1, EOI, 1); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java index 12a5644fe718f..8c5cc11b9051b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java @@ -70,7 +70,6 @@ void testSerDe(boolean withSinkV1State) throws IOException { final GlobalCommittableWrapper copy = serializer.deserialize(2, serializer.serialize(wrapper)); assertThat(copy.getGlobalCommittables()).containsExactlyInAnyOrderElementsOf(v1State); - assertThat(collector).returns(false, CommittableCollector::isFinished); assertThat(collector.getCheckpointCommittablesUpTo(2)) .singleElement() .returns(1L, CheckpointCommittableManager::getCheckpointId) diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index 56804ce9f2b0d..5b79a88f3291a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -48,17 +48,15 @@ void testAddSummary() { new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L, METRIC_GROUP); assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty(); - final CommittableSummary first = new CommittableSummary<>(1, 1, 1L, 1, 0, 0); + final CommittableSummary first = new CommittableSummary<>(1, 1, 1L, 1, 0); checkpointCommittables.addSummary(first); assertThat(checkpointCommittables.getSubtaskCommittableManagers()) .singleElement() .returns(1, SubtaskCommittableManager::getSubtaskId) - .returns(1L, SubtaskCommittableManager::getCheckpointId) - .returns(1, SubtaskCommittableManager::getNumPending) - .returns(0, SubtaskCommittableManager::getNumFailed); + .returns(1L, SubtaskCommittableManager::getCheckpointId); // Add different subtask id - final CommittableSummary third = new CommittableSummary<>(2, 1, 2L, 2, 1, 1); + final CommittableSummary third = new CommittableSummary<>(2, 1, 2L, 2, 1); checkpointCommittables.addSummary(third); assertThat(checkpointCommittables.getSubtaskCommittableManagers()).hasSize(2); } @@ -67,8 +65,8 @@ void testAddSummary() { void testCommit() throws IOException, InterruptedException { final CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L, METRIC_GROUP); - checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); - checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L, 2, 0)); checkpointCommittables.addCommittable(new CommittableWithLineage<>(3, 1L, 1)); checkpointCommittables.addCommittable(new CommittableWithLineage<>(4, 1L, 2)); @@ -95,11 +93,11 @@ void testCommit() throws IOException, InterruptedException { void testUpdateCommittableSummary() { final CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L, METRIC_GROUP); - checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0)); assertThatThrownBy( () -> checkpointCommittables.addSummary( - new CommittableSummary<>(1, 1, 1L, 2, 0, 0))) + new CommittableSummary<>(1, 1, 1L, 2, 0))) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("FLINK-25920"); } @@ -114,7 +112,7 @@ public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) { new CheckpointCommittableManagerImpl<>( new HashMap<>(), numberOfSubtasks, checkpointId, METRIC_GROUP); original.addSummary( - new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0, 0)); + new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0)); CheckpointCommittableManagerImpl copy = original.copy(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java index ed33daed844b4..938427ba2d51c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java @@ -63,8 +63,6 @@ void testCommittableCollectorV1SerDe() throws IOException { final CommittableCollector committableCollector = SERIALIZER.deserialize(1, serialized); - assertThat(committableCollector.isFinished()).isFalse(); - assertThat(committableCollector.getCheckpointCommittables()) .singleElement() .extracting( @@ -89,18 +87,15 @@ void testCommittableCollectorV2SerDe() throws IOException { final CommittableCollector committableCollector = new CommittableCollector<>(METRIC_GROUP); committableCollector.addMessage( - new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1, 1, 0)); + new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1, 0)); committableCollector.addMessage( - new CommittableSummary<>(subtaskId, numberOfSubtasks, 2L, 1, 1, 0)); + new CommittableSummary<>(subtaskId, numberOfSubtasks, 2L, 1, 0)); committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, subtaskId)); committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, subtaskId)); final CommittableCollector copy = ccSerializer.deserialize(2, SERIALIZER.serialize(committableCollector)); - // Expect the subtask Id equal to the origin of the collector - assertThat(copy.isFinished()).isFalse(); - // assert original CommittableCollector assertCommittableCollector( "Original CommittableCollector", @@ -140,9 +135,6 @@ void testCommittablesForSameSubtaskIdV2SerDe() throws IOException { final CommittableCollector copy = ccSerializer.deserialize(2, SERIALIZER.serialize(committableCollector)); - // Expect the subtask Id equal to the origin of the collector - assertThat(copy.isFinished()).isFalse(); - // assert original CommittableCollector assertCommittableCollector( "Original CommittableCollector", diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 6e55adcc0c572..6d311170d4741 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -37,11 +37,11 @@ class CommittableCollectorTest { void testGetCheckpointCommittablesUpTo() { final CommittableCollector committableCollector = new CommittableCollector<>(METRIC_GROUP); - CommittableSummary first = new CommittableSummary<>(1, 1, 1L, 1, 0, 0); + CommittableSummary first = new CommittableSummary<>(1, 1, 1L, 1, 0); committableCollector.addMessage(first); - CommittableSummary second = new CommittableSummary<>(1, 1, 2L, 1, 0, 0); + CommittableSummary second = new CommittableSummary<>(1, 1, 2L, 1, 0); committableCollector.addMessage(second); - committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0, 0)); + committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0)); assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2); @@ -52,7 +52,7 @@ void testGetCheckpointCommittablesUpTo() { void testGetEndOfInputCommittable() { final CommittableCollector committableCollector = new CommittableCollector<>(METRIC_GROUP); - CommittableSummary first = new CommittableSummary<>(1, 1, EOI, 1, 0, 0); + CommittableSummary first = new CommittableSummary<>(1, 1, EOI, 1, 0); committableCollector.addMessage(first); Optional> endOfInputCommittable = diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java index 251df7ac1bbef..70fe1a6504938 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -30,8 +29,6 @@ import java.util.Iterator; import java.util.stream.IntStream; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; class SubtaskCommittableManagerTest { @@ -39,7 +36,7 @@ class SubtaskCommittableManagerTest { MetricsGroupTestUtils.mockCommitterMetricGroup(); @Test - void testDrainCommittables() { + void testSuccessfulCommittables() { final SubtaskCommittableManager subtaskCommittableManager = new SubtaskCommittableManager<>(3, 1, 1L, METRIC_GROUP); final CommittableWithLineage first = new CommittableWithLineage<>(1, 1L, 1); @@ -54,7 +51,6 @@ void testDrainCommittables() { subtaskCommittableManager.add(third); assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(3); assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(3); - assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(0); assertThat(subtaskCommittableManager.isFinished()).isFalse(); // Trigger commit @@ -63,36 +59,18 @@ void testDrainCommittables() { IntStream.range(0, 2).forEach(i -> requests.next().setCommittedIfNoError()); assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(1); assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(3); - assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(0); - // Drain committed committables - ListAssert> committables = - assertThat(subtaskCommittableManager.drainCommitted()).hasSize(2); - committables - .element(0, as(committableWithLineage())) - .hasSubtaskId(1) - .hasCommittable(1) - .hasCheckpointId(1); - committables - .element(1, as(committableWithLineage())) - .hasSubtaskId(1) - .hasCommittable(2) - .hasCheckpointId(1); - assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0); - - // Drain again should not yield anything - assertThat(subtaskCommittableManager.drainCommitted()).hasSize(0); + // Check the successful committables + assertThat(subtaskCommittableManager.getSuccessfulCommittables()) + .containsExactlyInAnyOrder(1, 2); // Fail commit requests.next().signalFailedWithKnownReason(new RuntimeException("Unused exception")); - assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0); assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(0); assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(3); - assertThat(subtaskCommittableManager.isFinished()).isFalse(); - - // Drain to update fail count - assertThat(subtaskCommittableManager.drainCommitted()).hasSize(0); - assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(1); + // doesn't change the successful committables + assertThat(subtaskCommittableManager.getSuccessfulCommittables()) + .containsExactlyInAnyOrder(1, 2); assertThat(subtaskCommittableManager.isFinished()).isTrue(); } @@ -102,7 +80,6 @@ void testMerge() { new SubtaskCommittableManager<>( Collections.singletonList(new CommitRequestImpl<>(1, METRIC_GROUP)), 5, - 1, 2, 1, 2L, @@ -114,13 +91,11 @@ void testMerge() { new CommitRequestImpl<>(2, METRIC_GROUP), new CommitRequestImpl<>(3, METRIC_GROUP)), 10, - 2, 3, 1, 2L, METRIC_GROUP)); - assertThat(merged.getNumCommittables()).isEqualTo(11); - assertThat(merged.getNumDrained()).isEqualTo(3); + assertThat(merged.getNumCommittables()).isEqualTo(3); assertThat(merged.isFinished()).isFalse(); assertThat(merged.getNumFailed()).isEqualTo(5); assertThat(merged.getPendingRequests()).hasSize(3); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index 756ea0c8022f0..5fdb36a3953f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -62,7 +63,7 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { testHarness.open(); final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 1, 0); + new CommittableSummary<>(1, 1, 1L, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableWithLineage committableWithLineage = new CommittableWithLineage<>("1", 1L, 1); @@ -77,8 +78,7 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { assertThat(testHarness.extractOutputValues()).hasSize(2); records.element(0, as(committableSummary())) .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()) - .hasPendingCommittables(0); + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); records.element(1, as(committableWithLineage())) .isEqualTo(committableWithLineage.withSubtaskId(0)); } else { @@ -98,7 +98,7 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { // Only send first committable final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 2, 2, 0); + new CommittableSummary<>(1, 1, 1L, 2, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableWithLineage first = new CommittableWithLineage<>("1", 1L, 1); testHarness.processElement(new StreamRecord<>(first)); @@ -119,8 +119,7 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { assertThat(testHarness.extractOutputValues()).hasSize(3); records.element(0, as(committableSummary())) .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()) - .hasPendingCommittables(0); + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); testHarness.close(); @@ -136,10 +135,10 @@ void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { testHarness.open(); final CommittableSummary committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); + new CommittableSummary<>(1, 2, EOI, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableSummary committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); + new CommittableSummary<>(2, 2, EOI, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary2)); final CommittableWithLineage first = new CommittableWithLineage<>("1", EOI, 1); @@ -158,8 +157,7 @@ void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { assertThat(testHarness.extractOutputValues()).hasSize(3); records.element(0, as(committableSummary())) .hasFailedCommittables(0) - .hasOverallCommittables(2) - .hasPendingCommittables(0); + .hasOverallCommittables(2); records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); testHarness.close(); @@ -188,7 +186,7 @@ void testStateRestore() throws Exception { long checkpointId = 0L; final CommittableSummary committableSummary = - new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 1, 0); + new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableWithLineage first = new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); @@ -196,7 +194,7 @@ void testStateRestore() throws Exception { // another committable for the same checkpointId but from different subtask. final CommittableSummary committableSummary2 = - new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 1, 0); + new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary2)); final CommittableWithLineage second = new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); @@ -223,12 +221,12 @@ void testStateRestore() throws Exception { assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); ListAssert> records = assertThat(restored.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasFailedCommittables(0) - .hasOverallCommittables(2) - .hasPendingCommittables(0); + CommittableSummaryAssert objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasCheckpointId(checkpointId) + .hasFailedCommittables(0) + .hasSubtaskId(subtaskIdAfterRecovery); + objectCommittableSummaryAssert.hasOverallCommittables(2); // Expect the same checkpointId that the original snapshot was made with. records.element(1, as(committableWithLineage())) @@ -258,7 +256,7 @@ void testNumberOfRetries(int numRetries) throws Exception { long ckdId = 1L; testHarness.processElement( - new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0, 0))); + new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); testHarness.processElement( new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); AbstractThrowableAssert throwableAssert = @@ -276,44 +274,43 @@ void testNumberOfRetries(int numRetries) throws Exception { void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< + try (OneInputStreamOperatorTestHarness< CommittableMessage, CommittableMessage> testHarness = new OneInputStreamOperatorTestHarness<>( new CommitterOperatorFactory<>( - sinkAndCounters.sink, false, isCheckpointingEnabled)); - testHarness.open(); + sinkAndCounters.sink, false, isCheckpointingEnabled))) { + testHarness.open(); - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); - testHarness.endInput(); + testHarness.endInput(); - // If checkpointing enabled endInput does not emit anything because a final checkpoint - // follows - if (isCheckpointingEnabled) { - testHarness.notifyOfCompletedCheckpoint(1); - } + // If checkpointing enabled endInput does not emit anything because a final checkpoint + // follows + if (isCheckpointingEnabled) { + testHarness.notifyOfCompletedCheckpoint(1); + } - ListAssert> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - records.element(0, as(committableSummary())) - .hasCheckpointId(1L) - .hasPendingCommittables(0) - .hasOverallCommittables(1) - .hasFailedCommittables(0); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); + ListAssert> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + CommittableSummaryAssert objectCommittableSummaryAssert = + records.element(0, as(committableSummary())).hasCheckpointId(1L); + objectCommittableSummaryAssert.hasOverallCommittables(1); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); - // Future emission calls should change the output - testHarness.notifyOfCompletedCheckpoint(2); - testHarness.endInput(); + // Future emission calls should change the output + testHarness.notifyOfCompletedCheckpoint(2); + testHarness.endInput(); - assertThat(testHarness.getOutput()).hasSize(2); + assertThat(testHarness.getOutput()).hasSize(2); + } } private OneInputStreamOperatorTestHarness< diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index 3de6aa71ecef0..6bde86216b9f9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; @@ -295,10 +296,8 @@ void testRestoreCommitterState() throws Exception { assertThat(testHarness.extractOutputValues()).hasSize(4); records.element(0, as(committableSummary())) - .hasPendingCommittables(committables.size()) .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasOverallCommittables(committables.size()) - .hasFailedCommittables(0); + .hasOverallCommittables(committables.size()); records.>element(1, as(committableWithLineage())) .hasCommittable(committables.get(0)) .hasCheckpointId(INITIAL_CHECKPOINT_ID) @@ -307,11 +306,7 @@ void testRestoreCommitterState() throws Exception { .hasCommittable(committables.get(1)) .hasCheckpointId(INITIAL_CHECKPOINT_ID) .hasSubtaskId(0); - records.element(3, as(committableSummary())) - .hasPendingCommittables(0) - .hasCheckpointId(2L) - .hasOverallCommittables(0) - .hasFailedCommittables(0); + records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); } @ParameterizedTest @@ -338,10 +333,7 @@ void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Ex ListAssert> records = assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); - records.element(0, as(committableSummary())) - .hasPendingCommittables(committables.size()) - .hasOverallCommittables(committables.size()) - .hasFailedCommittables(0); + records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); records.filteredOn(message -> message instanceof CommittableWithLineage) .map(message -> ((CommittableWithLineage) message).getCommittable()) @@ -422,10 +414,9 @@ private static void assertBasicOutput( List> output, int numberOfCommittables, long checkpointId) { ListAssert> records = assertThat(output).hasSize(numberOfCommittables + 1); - records.element(0, as(committableSummary())) - .hasOverallCommittables(numberOfCommittables) - .hasPendingCommittables(numberOfCommittables) - .hasFailedCommittables(0); + CommittableSummaryAssert objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasOverallCommittables(numberOfCommittables); records.filteredOn(r -> r instanceof CommittableWithLineage) .allSatisfy( cl ->