Skip to content

Commit

Permalink
KAFKA-14505; [7/N] Always materialize the most recent committed offset (
Browse files Browse the repository at this point in the history
apache#15183)

When transactional offset commits are eventually committed, we must always keep the most recent committed when we have a mix of transactional and regular offset commits. We achieve this by storing the offset of the offset commit record along side the committed offset in memory. Without preserving information of the commit record offset, compaction of the __consumer_offsets topic itself may result in the wrong offset commit being materialized.

Reviewers: Jeff Kim <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored Jan 23, 2024
1 parent 70bd4ce commit 4d6a422
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class CoordinatorLoaderImpl[T](
numRecords = numRecords + 1
try {
coordinator.replay(
record.offset(),
batch.producerId,
batch.producerEpoch,
deserializer.deserialize(record.key, record.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ class CoordinatorLoaderImplTest {

assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))

verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(100L, 5.toShort, ("k6", "v6"))
verify(coordinator).replay(100L, 5.toShort, ("k7", "v7"))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, 100L, 5.toShort, ("k6", "v6"))
verify(coordinator).replay(6L, 100L, 5.toShort, ("k7", "v7"))
verify(coordinator).replayEndTransactionMarker(100L, 5, TransactionResult.COMMIT)
verify(coordinator).replayEndTransactionMarker(500L, 10, TransactionResult.ABORT)
verify(coordinator).updateLastWrittenOffset(2)
Expand Down Expand Up @@ -272,7 +272,7 @@ class CoordinatorLoaderImplTest {

loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)

verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
}
}

Expand Down Expand Up @@ -462,13 +462,13 @@ class CoordinatorLoaderImplTest {

assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))

verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator, times(0)).updateLastWrittenOffset(0)
verify(coordinator, times(1)).updateLastWrittenOffset(2)
verify(coordinator, times(1)).updateLastWrittenOffset(5)
Expand Down Expand Up @@ -563,13 +563,13 @@ class CoordinatorLoaderImplTest {

assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))

verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator, times(0)).updateLastWrittenOffset(0)
verify(coordinator, times(0)).updateLastWrittenOffset(2)
verify(coordinator, times(0)).updateLastWrittenOffset(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,15 @@ private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
/**
* Replays the Record to update the hard state of the group coordinator.
*
* @param offset The offset of the record in the log.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param record The record to apply to the state machine.
* @throws RuntimeException
*/
@Override
public void replay(
long offset,
long producerId,
short producerEpoch,
Record record
Expand All @@ -657,6 +659,7 @@ public void replay(
case 0:
case 1:
offsetMetadataManager.replay(
offset,
producerId,
(OffsetCommitKey) key.message(),
(OffsetCommitValue) messageOrNull(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class OffsetAndMetadata {
/**
* The committed offset.
*/
public final long offset;
public final long committedOffset;

/**
* The leader epoch in use when the offset was committed.
Expand All @@ -61,14 +61,38 @@ public class OffsetAndMetadata {
*/
public final OptionalLong expireTimestampMs;

/**
* The offset of the commit record in the log.
*/
public final long recordOffset;

public OffsetAndMetadata(
long committedOffset,
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
OptionalLong expireTimestampMs
) {
this(
-1L,
committedOffset,
leaderEpoch,
metadata,
commitTimestampMs,
expireTimestampMs
);
}

public OffsetAndMetadata(
long offset,
long recordOffset,
long committedOffset,
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
OptionalLong expireTimestampMs
) {
this.offset = offset;
this.recordOffset = recordOffset;
this.committedOffset = committedOffset;
this.leaderEpoch = Objects.requireNonNull(leaderEpoch);
this.metadata = Objects.requireNonNull(metadata);
this.commitTimestampMs = commitTimestampMs;
Expand All @@ -77,11 +101,12 @@ public OffsetAndMetadata(

@Override
public String toString() {
return "OffsetAndMetadata(offset=" + offset +
return "OffsetAndMetadata(offset=" + committedOffset +
", leaderEpoch=" + leaderEpoch +
", metadata=" + metadata +
", commitTimestampMs=" + commitTimestampMs +
", expireTimestampMs=" + expireTimestampMs +
", recordOffset=" + recordOffset +
')';
}

Expand All @@ -92,30 +117,34 @@ public boolean equals(Object o) {

OffsetAndMetadata that = (OffsetAndMetadata) o;

if (offset != that.offset) return false;
if (committedOffset != that.committedOffset) return false;
if (commitTimestampMs != that.commitTimestampMs) return false;
if (!leaderEpoch.equals(that.leaderEpoch)) return false;
if (!metadata.equals(that.metadata)) return false;
return expireTimestampMs.equals(that.expireTimestampMs);
if (recordOffset != that.recordOffset) return false;
if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;
if (!Objects.equals(metadata, that.metadata)) return false;
return Objects.equals(expireTimestampMs, that.expireTimestampMs);
}

@Override
public int hashCode() {
int result = (int) (offset ^ (offset >>> 32));
result = 31 * result + leaderEpoch.hashCode();
result = 31 * result + metadata.hashCode();
int result = (int) (committedOffset ^ (committedOffset >>> 32));
result = 31 * result + (leaderEpoch != null ? leaderEpoch.hashCode() : 0);
result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs >>> 32));
result = 31 * result + expireTimestampMs.hashCode();
result = 31 * result + (expireTimestampMs != null ? expireTimestampMs.hashCode() : 0);
result = 31 * result + (int) (recordOffset ^ (recordOffset >>> 32));
return result;
}

/**
* @return An OffsetAndMetadata created from a OffsetCommitValue record.
*/
public static OffsetAndMetadata fromRecord(
long recordOffset,
OffsetCommitValue record
) {
return new OffsetAndMetadata(
recordOffset,
record.offset(),
ofSentinel(record.leaderEpoch()),
record.metadata(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets(
} else {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setCommittedOffset(offsetAndMetadata.offset)
.setCommittedOffset(offsetAndMetadata.committedOffset)
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
.setMetadata(offsetAndMetadata.metadata));
}
Expand Down Expand Up @@ -799,7 +799,7 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
} else {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setCommittedOffset(offsetAndMetadata.offset)
.setCommittedOffset(offsetAndMetadata.committedOffset)
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
.setMetadata(offsetAndMetadata.metadata));
}
Expand Down Expand Up @@ -884,12 +884,14 @@ private TopicPartition appendOffsetCommitTombstone(
/**
* Replays OffsetCommitKey/Value to update or delete the corresponding offsets.
*
* @param producerId The producer id of the batch containing the provided
* key and value.
* @param key A OffsetCommitKey key.
* @param value A OffsetCommitValue value.
* @param recordOffset The offset of the record in the log.
* @param producerId The producer id of the batch containing the provided
* key and value.
* @param key A OffsetCommitKey key.
* @param value A OffsetCommitValue value.
*/
public void replay(
long recordOffset,
long producerId,
OffsetCommitKey key,
OffsetCommitValue value
Expand Down Expand Up @@ -918,7 +920,7 @@ public void replay(
groupId,
topic,
partition,
OffsetAndMetadata.fromRecord(value)
OffsetAndMetadata.fromRecord(recordOffset, value)
);
if (previousValue == null) {
metrics.incrementNumOffsets();
Expand All @@ -934,7 +936,7 @@ public void replay(
groupId,
topic,
partition,
OffsetAndMetadata.fromRecord(value)
OffsetAndMetadata.fromRecord(recordOffset, value)
);
openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
Expand Down Expand Up @@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
topicOffsets.forEach((topicName, partitionOffsets) -> {
partitionOffsets.forEach((partitionId, offsetAndMetadata) -> {
log.debug("Committed transaction offset commit for producer id {} in group {} " +
"with topic {}, partition {}, and offset {}.",
producerId, groupId, topicName, partitionId, offsetAndMetadata);
offsets.put(
OffsetAndMetadata existingOffsetAndMetadata = offsets.get(
groupId,
topicName,
partitionId,
offsetAndMetadata
partitionId
);

// We always keep the most recent committed offset when we have a mix of transactional and regular
// offset commits. Without preserving information of the commit record offset, compaction of the
// __consumer_offsets topic itself may result in the wrong offset commit being materialized.
if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
log.debug("Committed transactional offset commit {} for producer id {} in group {} " +
"with topic {} and partition {}.",
offsetAndMetadata, producerId, groupId, topicName, partitionId);
offsets.put(
groupId,
topicName,
partitionId,
offsetAndMetadata
);
} else {
log.info("Skipped the materialization of transactional offset commit {} for producer id {} in group {} with topic {}, " +
"partition {} since its record offset {} is smaller than the record offset {} of the last committed offset.",
offsetAndMetadata, producerId, groupId, topicName, partitionId, offsetAndMetadata.recordOffset, existingOffsetAndMetadata.recordOffset);
}
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public static Record newOffsetCommitRecord(
),
new ApiMessageAndVersion(
new OffsetCommitValue()
.setOffset(offsetAndMetadata.offset)
.setOffset(offsetAndMetadata.committedOffset)
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setMetadata(offsetAndMetadata.metadata)
.setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ public interface CoordinatorPlayback<U> {
/**
* Applies the given record to this object.
*
* @param offset The offset of the record in the log.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param record A record.
* @throws RuntimeException if the record can not be applied.
*/
void replay(
long offset,
long producerId,
short producerEpoch,
U record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,13 +713,17 @@ public void run() {
try {
// Apply the records to the state machine.
if (result.replayRecords()) {
result.records().forEach(record ->
// We compute the offset of the record based on the last written offset. The
// coordinator is the single writer to the underlying partition so we can
// deduce it like this.
for (int i = 0; i < result.records().size(); i++) {
context.coordinator.replay(
prevLastWrittenOffset + i,
producerId,
producerEpoch,
record
)
);
result.records().get(i)
);
}
}

// Write the records to the log and update the last written
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ default void onUnloaded() {}
/**
* Replay a record to update the state machine.
*
* @param offset The offset of the record in the log.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param record The record to replay.
*/
void replay(
long offset,
long producerId,
short producerEpoch,
U record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ synchronized void revertLastWrittenOffset(
/**
* Replays the record onto the state machine.
*
* @param offset The offset of the record in the log.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param record A record.
*/
@Override
public synchronized void replay(
long offset,
long producerId,
short producerEpoch,
U record
) {
coordinator.replay(producerId, producerEpoch, record);
coordinator.replay(offset, producerId, producerEpoch, record);
}

/**
Expand Down
Loading

0 comments on commit 4d6a422

Please sign in to comment.