Skip to content

Commit

Permalink
[FLINK-36455] Deprecate async parts of committable summary
Browse files Browse the repository at this point in the history
Without async parts of committable summary, number of pending committables will always be 0.

Failed committables will also be 0 as they will throw an error if unexpected or not they are silently ignored. The previous behavior with them being >0 actually led to infinite loops in the global committer.
  • Loading branch information
AHeise committed Nov 6, 2024
1 parent 21c344c commit 13fe0e6
Show file tree
Hide file tree
Showing 18 changed files with 150 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommitt
summary.getNumberOfSubtasks(),
summary.getCheckpointIdOrEOI(),
summary.getNumberOfCommittables() + results.size(),
summary.getNumberOfPendingCommittables() + results.size(),
summary.getNumberOfFailedCommittables())));
for (FileSinkCommittable committable : results) {
output.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ void testCompact() throws Exception {
// 1summary+1compacted+2cleanup
ListAssert<CommittableMessage<FileSinkCommittable>> results =
assertThat(harness.extractOutputValues()).hasSize(4);
results.element(0, as(committableSummary())).hasPendingCommittables(3);
results.element(1, as(committableWithLineage()))
.hasCommittable(committable("0", "compacted-0", 10));
results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".0"));
Expand Down Expand Up @@ -140,7 +139,6 @@ void testPassthrough() throws Exception {

ListAssert<CommittableMessage<FileSinkCommittable>> messages =
assertThat(harness.extractOutputValues()).hasSize(3);
messages.element(0, as(committableSummary())).hasPendingCommittables(2);
messages.element(1, as(committableWithLineage()))
.hasCommittable(cleanupInprogressRequest);
messages.element(2, as(committableWithLineage())).hasCommittable(cleanupPathRequest);
Expand Down Expand Up @@ -207,13 +205,13 @@ void testRestore() throws Exception {
// 1summary+1compacted+2cleanup * 2
ListAssert<CommittableMessage<FileSinkCommittable>> results =
assertThat(harness.extractOutputValues()).hasSize(8);
results.element(0, as(committableSummary())).hasPendingCommittables(3);
results.element(0, as(committableSummary()));
results.element(1, as(committableWithLineage()))
.hasCommittable(committable("0", "compacted-0", 10));
results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".0"));
results.element(3, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".1"));

results.element(4, as(committableSummary())).hasPendingCommittables(3);
results.element(4, as(committableSummary()));
results.element(5, as(committableWithLineage()))
.hasCommittable(committable("0", "compacted-2", 10));
results.element(6, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".2"));
Expand Down Expand Up @@ -311,7 +309,7 @@ void testStateHandler() throws Exception {
// + 1 summary + 1 cleanup + 1 summary
ListAssert<CommittableMessage<FileSinkCommittable>> results =
assertThat(harness.extractOutputValues()).hasSize(18);
results.element(0, as(committableSummary())).hasPendingCommittables(14);
results.element(0, as(committableSummary()));

List<FileSinkCommittable> expectedResult =
Arrays.asList(
Expand All @@ -335,11 +333,11 @@ void testStateHandler() throws Exception {
.hasCommittable(expectedResult.get(i));
}

results.element(15, as(committableSummary())).hasPendingCommittables(1);
results.element(15, as(committableSummary()));
results.element(16, as(committableWithLineage()))
.hasCommittable(cleanupPath("0", ".6"));

results.element(17, as(committableSummary())).hasPendingCommittables(3);
results.element(17, as(committableSummary()));
}
}

Expand Down Expand Up @@ -378,7 +376,7 @@ void testStateHandlerRestore() throws Exception {

ListAssert<CommittableMessage<FileSinkCommittable>> results =
assertThat(harness.extractOutputValues()).hasSize(3);
results.element(0, as(committableSummary())).hasPendingCommittables(4);
results.element(0, as(committableSummary()));
results.element(1, as(committableWithLineage()))
.hasCommittable(committable("0", "compacted-1", 1));
results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".1"));
Expand Down Expand Up @@ -437,7 +435,7 @@ void testStateHandlerRestore() throws Exception {

ListAssert<CommittableMessage<FileSinkCommittable>> results =
assertThat(harness.extractOutputValues()).hasSize(2);
results.element(0, as(committableSummary())).hasPendingCommittables(1);
results.element(0, as(committableSummary()));
results.element(1, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".2"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,29 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> {
private final long checkpointId;
/** The number of committables coming from the given subtask in the particular checkpoint. */
private final int numberOfCommittables;

@Deprecated
/** The number of committables that have not been successfully committed. */
private final int numberOfPendingCommittables;
/** The number of committables that are not retried and have been failed. */
private final int numberOfFailedCommittables;

public CommittableSummary(
int subtaskId,
int numberOfSubtasks,
long checkpointId,
int numberOfCommittables,
int numberOfFailedCommittables) {
this(
subtaskId,
numberOfSubtasks,
checkpointId,
numberOfCommittables,
0,
numberOfFailedCommittables);
}

@Deprecated
public CommittableSummary(
int subtaskId,
int numberOfSubtasks,
Expand Down Expand Up @@ -75,8 +93,9 @@ public int getNumberOfCommittables() {
return numberOfCommittables;
}

@Deprecated
public int getNumberOfPendingCommittables() {
return numberOfPendingCommittables;
return 0;
}

public int getNumberOfFailedCommittables() {
Expand All @@ -89,7 +108,6 @@ public <NewCommT> CommittableSummary<NewCommT> map() {
numberOfSubtasks,
checkpointId,
numberOfCommittables,
numberOfPendingCommittables,
numberOfFailedCommittables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ private void emit(CheckpointCommittableManager<CommT> committableManager) {
numberOfSubtasks,
checkpointId,
committables.size(),
0,
0)));
committableManager.getNumFailed())));
for (CommT committable : committables) {
output.collect(
new StreamRecord<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,24 @@ public interface CheckpointCommittableManager<CommT> {
void commit(Committer<CommT> committer, int maxRetries)
throws IOException, InterruptedException;

/**
* Returns the number of committables that have been successfully committed; that is, the
* corresponding {@link org.apache.flink.api.connector.sink2.Committer.CommitRequest} was not
* used to signal an error of any kind (retryable or not).
*
* @return number of successful committables
*/
Collection<CommT> getSuccessfulCommittables();

/**
* Returns the number of committables that have failed with a known error. By the current
* semantics of {@link
* org.apache.flink.api.connector.sink2.Committer.CommitRequest#signalFailedWithKnownReason(Throwable)}
* discards the committable but proceeds processing. The returned number should be emitted
* downstream in a {@link org.apache.flink.streaming.api.connector.sink2.CommittableSummary},
* such that downstream can assess if all committables have been processed.
*
* @return number of failed committables
*/
int getNumFailed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public Collection<CommT> getSuccessfulCommittables() {
.collect(Collectors.toList());
}

@Override
public int getNumFailed() {
return subtasksCommittableManagers.values().stream()
.mapToInt(SubtaskCommittableManager::getNumFailed)
.sum();
}

Stream<CommitRequestImpl<CommT>> getPendingRequests() {
return subtasksCommittableManagers.values().stream()
.peek(this::assertReceivedAll)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public SubtaskSimpleVersionedSerializer() {

@Override
public int getVersion() {
return 1;
return 2;
}

@Override
Expand All @@ -219,7 +219,6 @@ public byte[] serialize(SubtaskCommittableManager<CommT> subtask) throws IOExcep
new ArrayList<>(subtask.getRequests()),
out);
out.writeInt(subtask.getNumCommittables());
out.writeInt(subtask.getNumDrained());
out.writeInt(subtask.getNumFailed());
return out.getCopyOfBuffer();
}
Expand All @@ -236,7 +235,7 @@ public SubtaskCommittableManager<CommT> deserialize(int version, byte[] serializ
return new SubtaskCommittableManager<>(
requests,
in.readInt(),
in.readInt(),
version >= 2 ? 0 : in.readInt(),
in.readInt(),
subtaskId,
checkNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -46,7 +43,7 @@ class SubtaskCommittableManager<CommT> {
private final int numExpectedCommittables;
private final long checkpointId;
private final int subtaskId;
private int numDrained;
@Deprecated private int numDrained;
private int numFailed;
private final SinkCommitterMetricGroup metricGroup;

Expand All @@ -65,6 +62,17 @@ class SubtaskCommittableManager<CommT> {
metricGroup);
}

SubtaskCommittableManager(
Collection<CommitRequestImpl<CommT>> requests,
int numExpectedCommittables,
int numFailed,
int subtaskId,
long checkpointId,
SinkCommitterMetricGroup metricGroup) {
this(requests, numExpectedCommittables, 0, numFailed, subtaskId, checkpointId, metricGroup);
}

@Deprecated
SubtaskCommittableManager(
Collection<CommitRequestImpl<CommT>> requests,
int numExpectedCommittables,
Expand Down Expand Up @@ -98,7 +106,7 @@ void add(CommT committable) {
* @return if all committables have been received
*/
boolean hasReceivedAll() {
return getNumCommittables() == numExpectedCommittables;
return getNumCommittables() == numExpectedCommittables + numFailed;
}

/**
Expand All @@ -107,26 +115,15 @@ boolean hasReceivedAll() {
* @return number of so far received committables
*/
int getNumCommittables() {
return requests.size() + numDrained + numFailed;
}

/**
* Returns the number of still expected commits.
*
* <p>Either the committables are not yet received or the commit is still pending.
*
* @return number of still expected committables
*/
int getNumPending() {
return numExpectedCommittables - (numDrained + numFailed);
return requests.size();
}

int getNumFailed() {
return numFailed;
}

boolean isFinished() {
return getNumPending() == 0;
return getPendingRequests().findAny().isEmpty();
}

/**
Expand All @@ -145,43 +142,6 @@ Stream<CommT> getSuccessfulCommittables() {
.map(CommitRequestImpl::getCommittable);
}

/**
* Iterates through all currently registered {@link #requests} and returns all {@link
* CommittableWithLineage} that could be successfully committed.
*
* <p>Invoking this method does not yield the same {@link CommittableWithLineage} again. Once
* retrieved they are not part of {@link #requests} anymore.
*
* @return list of {@link CommittableWithLineage}
*/
List<CommittableWithLineage<CommT>> drainCommitted() {
List<CommittableWithLineage<CommT>> committed = new ArrayList<>(requests.size());
for (Iterator<CommitRequestImpl<CommT>> iterator = requests.iterator();
iterator.hasNext(); ) {
CommitRequestImpl<CommT> request = iterator.next();
if (!request.isFinished()) {
continue;
}
if (request.getState() == CommitRequestState.FAILED) {
numFailed += 1;
iterator.remove();
continue;
} else {
committed.add(
new CommittableWithLineage<>(
request.getCommittable(), checkpointId, subtaskId));
}
iterator.remove();
}

numDrained += committed.size();
return committed;
}

int getNumDrained() {
return numDrained;
}

int getSubtaskId() {
return subtaskId;
}
Expand All @@ -202,7 +162,6 @@ SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> other) {
Stream.concat(requests.stream(), other.requests.stream())
.collect(Collectors.toList()),
numExpectedCommittables + other.numExpectedCommittables,
numDrained + other.numDrained,
numFailed + other.numFailed,
subtaskId,
checkpointId,
Expand All @@ -213,7 +172,6 @@ SubtaskCommittableManager<CommT> copy() {
return new SubtaskCommittableManager<>(
requests.stream().map(CommitRequestImpl::copy).collect(Collectors.toList()),
numExpectedCommittables,
numDrained,
numFailed,
subtaskId,
checkpointId,
Expand Down Expand Up @@ -254,8 +212,6 @@ public String toString() {
+ checkpointId
+ ", subtaskId="
+ subtaskId
+ ", numDrained="
+ numDrained
+ ", numFailed="
+ numFailed
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void testCommittableWithLinageSerDe() throws IOException {
@Test
void testCommittableSummarySerDe() throws IOException {
final CommittableSummary<Integer> committableSummary =
new CommittableSummary<>(1, 2, 3L, 4, 5, 6);
new CommittableSummary<>(1, 2, 3L, 4, 5);
final CommittableMessage<Integer> message =
SERIALIZER.deserialize(
CommittableMessageSerializer.VERSION,
Expand All @@ -58,7 +58,5 @@ void testCommittableSummarySerDe() throws IOException {
assertThat(copy.getNumberOfSubtasks()).isEqualTo(2);
assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(3L);
assertThat(copy.getNumberOfCommittables()).isEqualTo(4);
assertThat(copy.getNumberOfPendingCommittables()).isEqualTo(5);
assertThat(copy.getNumberOfFailedCommittables()).isEqualTo(6);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public CommittableSummaryAssert<CommT> hasOverallCommittables(int committableNum
return returns(committableNumber, CommittableSummary::getNumberOfCommittables);
}

public CommittableSummaryAssert<CommT> hasPendingCommittables(int committableNumber) {
return returns(committableNumber, CommittableSummary::getNumberOfPendingCommittables);
}

public CommittableSummaryAssert<CommT> hasFailedCommittables(int committableNumber) {
return returns(committableNumber, CommittableSummary::getNumberOfFailedCommittables);
}
Expand Down
Loading

0 comments on commit 13fe0e6

Please sign in to comment.