Skip to content

Commit

Permalink
MINOR; Introduce ImageWriter and ImageWriterOptions (apache#12715)
Browse files Browse the repository at this point in the history
This PR adds a new ImageWriter interface which replaces the generic Consumer interface which
accepted lists of records. It is better to do batching in the ImageWriter than to try to deal with
that complexity in the MetadataImage#write functions, especially since batching is not semantically
meaningful in KRaft snapshots. The new ImageWriter interface also supports freeze and close, which
more closely matches the semantics of the underlying Raft classes.

The PR also adds an ImageWriterOptions class which we can use to pass parameters to control how the
new image is written. Right now, the parameters that we are interested in are the target metadata
version (which may be more or less than the original image's version) and a handler function which
is invoked whenever metadata is lost due to the target version.

Convert over the MetadataImage#write function (and associated functions) to use the new ImageWriter
and ImageWriterOptions. In particular, we now have a way to handle metadata losses by invoking
ImageWriterOptions#handleLoss. This allows us to handle writing an image at a lower version, for
the first time. This support is still not enabled externally by this PR, though. That will come in
a future PR.

Get rid of the use of SOME_RECORD_TYPE.highestSupportedVersion() in several places. In general, we
do not want to "silently" change the version of a record that we output, just because a new version
was added. We should be explicit about what record version numbers we are outputting.

Implement ProducerIdsDelta#toString, to make debug logs look better.

Move MockRandom to the server-common package so that other internal broker packages can use it.

Reviewers: José Armando García Sancio <[email protected]>
  • Loading branch information
cmccabe authored Oct 13, 2022
1 parent 18e60cb commit dac8116
Show file tree
Hide file tree
Showing 40 changed files with 863 additions and 216 deletions.
8 changes: 7 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
Expand All @@ -246,11 +247,15 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.image.writer" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>

<subpackage name="metadata">
Expand All @@ -269,6 +274,7 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
<allow pkg="org.apache.kafka.common.acl" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package kafka.server.metadata

import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.function.Consumer
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
Expand Down Expand Up @@ -390,15 +390,18 @@ class BrokerMetadataListener(
}

class GetImageRecordsEvent(future: CompletableFuture[util.List[ApiMessageAndVersion]])
extends EventQueue.FailureLoggingEvent(log) with Consumer[util.List[ApiMessageAndVersion]] {
val records = new util.ArrayList[ApiMessageAndVersion]()
override def accept(batch: util.List[ApiMessageAndVersion]): Unit = {
records.addAll(batch)
}

extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
_image.write(this)
future.complete(records)
val writer = new RecordListWriter()
val options = new ImageWriterOptions.Builder().
setMetadataVersion(_image.features().metadataVersion()).
build()
try {
_image.write(writer, options)
} finally {
writer.close()
}
future.complete(writer.records())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,18 @@ import java.util.concurrent.RejectedExecutionException
import kafka.utils.Logging
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.writer.{ImageWriterOptions, RaftSnapshotWriter}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter

import java.util.function.Consumer

trait SnapshotWriterBuilder {
def build(committedOffset: Long,
committedEpoch: Int,
lastContainedLogTime: Long): Option[SnapshotWriter[ApiMessageAndVersion]]
}

/**
* The RecordListConsumer takes as input a potentially long list of records, and feeds the
* SnapshotWriter a series of smaller lists of records.
*
* Note: from the perspective of Kafka, the snapshot file is really just a list of records,
* and we don't care about batches. Batching is irrelevant to the meaning of the snapshot.
*/
class RecordListConsumer(
val maxRecordsInBatch: Int,
val writer: SnapshotWriter[ApiMessageAndVersion]
) extends Consumer[java.util.List[ApiMessageAndVersion]] {
override def accept(messages: java.util.List[ApiMessageAndVersion]): Unit = {
var i = 0
while (i < messages.size()) {
writer.append(messages.subList(i, Math.min(i + maxRecordsInBatch, messages.size())));
i += maxRecordsInBatch
}
}
}

class BrokerMetadataSnapshotter(
brokerId: Int,
val time: Time,
Expand Down Expand Up @@ -108,15 +87,18 @@ class BrokerMetadataSnapshotter(
}
}

class CreateSnapshotEvent(image: MetadataImage,
writer: SnapshotWriter[ApiMessageAndVersion])
extends EventQueue.Event {
class CreateSnapshotEvent(
image: MetadataImage,
snapshotWriter: SnapshotWriter[ApiMessageAndVersion]
) extends EventQueue.Event {

override def run(): Unit = {
val writer = new RaftSnapshotWriter(snapshotWriter, maxRecordsInBatch)
val options = new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
build()
try {
val consumer = new RecordListConsumer(maxRecordsInBatch, writer)
image.write(consumer)
writer.freeze()
image.write(writer, options)
} finally {
try {
writer.close()
Expand All @@ -134,7 +116,6 @@ class BrokerMetadataSnapshotter(
info("Not processing CreateSnapshotEvent because the event queue is closed.")
case _ => error("Unexpected error handling CreateSnapshotEvent", e)
}
writer.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CompletableFuture, CountDownLatch}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metadata.FenceBrokerRecord
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
import org.apache.kafka.common.utils.Time
Expand All @@ -36,7 +35,6 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test

import java.util
import java.util.Arrays.asList
import scala.compat.java8.OptionConverters._

class BrokerMetadataSnapshotterTest {
Expand Down Expand Up @@ -141,20 +139,4 @@ class BrokerMetadataSnapshotterTest {
override def freeze(): Unit = {}
override def close(): Unit = {}
}

@Test
def testRecordListConsumer(): Unit = {
val writer = new MockSnapshotWriter()
val consumer = new RecordListConsumer(3, writer)
val m = new ApiMessageAndVersion(new FenceBrokerRecord().setId(1).setEpoch(1), 0.toShort)
consumer.accept(asList(m, m))
assertEquals(asList(asList(m, m)), writer.batches)
consumer.accept(asList(m))
assertEquals(asList(asList(m, m), asList(m)), writer.batches)
consumer.accept(asList(m, m, m, m))
assertEquals(asList(asList(m, m), asList(m), asList(m, m, m), asList(m)), writer.batches)
consumer.accept(asList(m, m, m, m, m, m, m, m))
assertEquals(asList(asList(m, m), asList(m), asList(m, m, m), asList(m), asList(m, m, m), asList(m, m, m), asList(m, m)),
writer.batches)
}
}
16 changes: 8 additions & 8 deletions metadata/src/main/java/org/apache/kafka/image/AclsImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
package org.apache.kafka.image;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.stream.Collectors;


Expand All @@ -53,13 +51,15 @@ public Map<Uuid, StandardAcl> acls() {
return acls;
}

public void write(Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> batch = new ArrayList<>();
public void write(ImageWriter writer, ImageWriterOptions options) {
// Technically, AccessControlEntryRecord appeared in 3.2-IV0, so we should not write it if
// the output version is less than that. However, there is a problem: pre-production KRaft
// images didn't support FeatureLevelRecord, so we can't distinguish 3.2-IV0 from 3.0-IV1.
// The least bad way to resolve this is just to pretend that ACLs were in 3.0-IV1.
for (Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
StandardAclWithId aclWithId = new StandardAclWithId(entry.getKey(), entry.getValue());
batch.add(new ApiMessageAndVersion(aclWithId.toRecord(), (short) 0));
writer.write(0, aclWithId.toRecord());
}
out.accept(batch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -30,11 +31,8 @@
import java.util.Map.Entry;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;


/**
* Represents a quota for a client entity in the metadata image.
Expand All @@ -54,17 +52,18 @@ Map<String, Double> quotas() {
return quotas;
}

public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> records = new ArrayList<>(quotas.size());
public void write(
ClientQuotaEntity entity,
ImageWriter writer,
ImageWriterOptions options
) {
for (Entry<String, Double> entry : quotas.entrySet()) {
records.add(new ApiMessageAndVersion(new ClientQuotaRecord().
writer.write(0, new ClientQuotaRecord().
setEntity(entityToData(entity)).
setKey(entry.getKey()).
setValue(entry.getValue()).
setRemove(false),
CLIENT_QUOTA_RECORD.highestSupportedVersion()));
setRemove(false));
}
out.accept(records);
}

public static List<EntityData> entityToData(ClientQuotaEntity entity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
Expand Down Expand Up @@ -67,11 +66,11 @@ Map<ClientQuotaEntity, ClientQuotaImage> entities() {
return entities;
}

public void write(Consumer<List<ApiMessageAndVersion>> out) {
public void write(ImageWriter writer, ImageWriterOptions options) {
for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
ClientQuotaEntity entity = entry.getKey();
ClientQuotaImage clientQuotaImage = entry.getValue();
clientQuotaImage.write(entity, out);
clientQuotaImage.write(entity, writer, options);
}
}

Expand Down
13 changes: 4 additions & 9 deletions metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.kafka.image;

import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -54,12 +51,10 @@ public BrokerRegistration broker(int nodeId) {
return brokers.get(nodeId);
}

public void write(Consumer<List<ApiMessageAndVersion>> out, MetadataVersion metadataVersion) {
List<ApiMessageAndVersion> batch = new ArrayList<>();
public void write(ImageWriter writer, ImageWriterOptions options) {
for (BrokerRegistration broker : brokers.values()) {
batch.add(broker.toRecord(metadataVersion));
writer.write(broker.toRecord(options));
}
out.accept(batch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@

import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;


/**
Expand Down Expand Up @@ -61,16 +58,18 @@ public Properties toProperties() {
return properties;
}

public void write(ConfigResource configResource, Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> records = new ArrayList<>();
public void write(
ConfigResource configResource,
ImageWriter writer,
ImageWriterOptions options
) {
for (Map.Entry<String, String> entry : data.entrySet()) {
records.add(new ApiMessageAndVersion(new ConfigRecord().
writer.write(0, new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
setName(entry.getKey()).
setValue(entry.getValue()), CONFIG_RECORD.highestSupportedVersion()));
setValue(entry.getValue()));
}
out.accept(records);
}

@Override
Expand Down
Loading

0 comments on commit dac8116

Please sign in to comment.