diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 4906cf5172b11..8ae3bf31ad4c2 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter; +import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; @@ -69,6 +70,8 @@ public class PulsarKafkaProducer implements Producer { private final Partitioner partitioner; private volatile Cluster cluster = Cluster.empty(); + private List> interceptors; + public PulsarKafkaProducer(Map configs) { this(configs, null, null); } @@ -157,6 +160,9 @@ private PulsarKafkaProducer(Map conf, Properties properties, Ser // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error. boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull; pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer); + + interceptors = (List) producerConfig.getConfiguredInstances( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); } @Override @@ -238,7 +244,13 @@ private org.apache.pulsar.client.api.Producer createNewProducer(String t try { // Add the partitions info for the new topic cluster = cluster.withPartitions(readPartitionsInfo(topic)); - return pulsarProducerBuilder.clone().topic(topic).create(); + List wrappedInterceptors = interceptors.stream() + .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySerializer, valueSerializer, topic)) + .collect(Collectors.toList()); + return pulsarProducerBuilder.clone() + .topic(topic) + .intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()])) + .create(); } catch (PulsarClientException e) { throw new RuntimeException(e); } @@ -308,5 +320,23 @@ private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder + * Extend {@link ProducerInterceptor} as all Pulsar {@link Message} created by + * {@link org.apache.kafka.clients.producer.PulsarKafkaProducer} is of type byte[]. + * + */ +public class KafkaProducerInterceptorWrapper implements ProducerInterceptor { + + private static final Logger log = LoggerFactory.getLogger(KafkaProducerInterceptorWrapper.class); + + final private org.apache.kafka.clients.producer.ProducerInterceptor kafkaProducerInterceptor; + + // For serializer key/value, and to determine the deserializer for key/value. + private final Serializer keySerializer; + + private final Serializer valueSerializer; + + // Keep the topic, as each Pulsar producer will tie to a Kafka topic, and ProducerInterceptor will tie to a Pulsar + // producer, it's safe to set it as final. + private final String topic; + + private Schema scheme; + + private long eventTime; + + private String partitionID; + + /** + * Create a wrapper of type {@link ProducerInterceptor} that will delegate all work to underlying Kafka's interceptor. + * + * @param kafkaProducerInterceptor Underlying instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor} + * that this wrapper will delegate work to. + * @param keySerializer {@link Serializer} used to serialize Kafka {@link ProducerRecord#key}. + * @param valueSerializer {@link Serializer} used to serialize Kafka {@link ProducerRecord#value}. + * @param topic Topic this {@link ProducerInterceptor} will be associated to. + */ + public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor kafkaProducerInterceptor, + Serializer keySerializer, + Serializer valueSerializer, + String topic) { + this.kafkaProducerInterceptor = kafkaProducerInterceptor; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.topic = topic; + } + + /** + * Called when interceptor is closed. + * The wrapper itself doesn't own any resource, just call underlying {@link org.apache.kafka.clients.producer.ProducerInterceptor#close()} + */ + @Override + public void close() { + kafkaProducerInterceptor.close(); + } + + /** + * It tries to convert a Pulsar {@link Message} to a Kafka{@link ProducerRecord}, pass it to underlying + * {@link org.apache.kafka.clients.producer.ProducerInterceptor#onSend(ProducerRecord)} then convert the output + * back to Pulsar {@link Message}. + *

+ * When build a Pulsar {@link Message} at {@link org.apache.kafka.clients.producer.PulsarKafkaProducer#buildMessage} + * schema, eventtime, partitionID, key and value are set. All this information will be preserved during the conversion. + * + * @param producer the producer which contains the interceptor, will be ignored as Kafka + * {@link org.apache.kafka.clients.producer.ProducerInterceptor} doesn't use it. + * @param message message to send + * @return Processed message. + */ + @Override + public Message beforeSend(Producer producer, Message message) { + return toPulsarMessage(kafkaProducerInterceptor.onSend(toKafkaRecord(message))); + } + + /** + * Delegate work to {@link org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement} + * @param producer the producer which contains the interceptor. + * @param message the message that application sends + * @param msgId the message id that assigned by the broker; null if send failed. + * @param exception the exception on sending messages, null indicates send has succeed. + */ + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) { + try { + PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl)message).getMessageBuilder(); + partitionID = getPartitionID(messageMetadataBuilder); + TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID)); + kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition, + -1, + -1, + messageMetadataBuilder.getEventTime(), + -1, + message.getKeyBytes().length, + message.getValue().length), new Exception(exception)); + } catch (NumberFormatException e) { + String errorMessage = "Unable to convert partitionID to integer: " + e.getMessage(); + log.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } + + /** + * Convert a Kafka {@link ProducerRecord} to a Pulsar {@link Message}. + * + * @param producerRecord Kafka record to be convert. + * @return Pulsar message. + */ + private Message toPulsarMessage(ProducerRecord producerRecord) { + TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, scheme); + typedMessageBuilder.key(serializeKey(topic, producerRecord.key())); + typedMessageBuilder.value(valueSerializer.serialize(topic, producerRecord.value())); + typedMessageBuilder.eventTime(eventTime); + typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, partitionID); + return typedMessageBuilder.getMessage(); + } + + /** + * Convert a Pulsar {@link Message} to a Kafka {@link ProducerRecord}. + * First it'll store those field that Kafka record doesn't need such as schema. + * Then it try to deserialize the value as it's been serialized to byte[] when creating the message. + * + * @param message Pulsar message to be convert. + * @return Kafka record. + */ + private ProducerRecord toKafkaRecord(Message message) { + Deserializer valueDeserializer = getDeserializer(valueSerializer); + V value = (V) valueDeserializer.deserialize(topic, message.getValue()); + try { + scheme = (Schema) FieldUtils.readField(message, "schema", true); + PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl)message).getMessageBuilder(); + partitionID = getPartitionID(messageMetadataBuilder); + eventTime = message.getEventTime(); + return new ProducerRecord<>(topic, Integer.parseInt(partitionID), eventTime, deserializeKey(topic, message.getKey()), value); + } catch (NumberFormatException e) { + // If not able to parse partitionID, ignore it. + return new ProducerRecord<>(topic, deserializeKey(topic, message.getKey()), value); + } catch (IllegalAccessException e) { + String errorMessage = "Unable to get the schema of message due to " + e.getMessage(); + log.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } + + private String serializeKey(String topic, K key) { + // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64 + if (keySerializer instanceof StringSerializer) { + return (String) key; + } else { + byte[] keyBytes = keySerializer.serialize(topic, key); + return Base64.getEncoder().encodeToString(keyBytes); + } + } + + private K deserializeKey(String topic, String key) { + // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64 + if (keySerializer instanceof StringSerializer) { + return (K) key; + } else { + Deserializer keyDeserializer = getDeserializer(keySerializer); + return (K) keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key)); + } + } + + /** + * Try to get the partitionID from messageMetadataBuilder. + * As it is set in {@link org.apache.kafka.clients.producer.PulsarKafkaProducer#buildMessage}, can guarantee + * a partitionID will be return. + * + * @param messageMetadataBuilder + * @return PartitionID + */ + private String getPartitionID(PulsarApi.MessageMetadata.Builder messageMetadataBuilder) { + return messageMetadataBuilder.getPropertiesList() + .stream() + .filter(keyValue -> keyValue.getKey().equals(KafkaMessageRouter.PARTITION_ID)) + .findFirst() + .get() + .getValue(); + } + + private Deserializer getDeserializer(Serializer serializer) { + if (serializer instanceof StringSerializer) { + return new StringDeserializer(); + } else if (serializer instanceof LongDeserializer) { + return new LongDeserializer(); + } else if (serializer instanceof IntegerSerializer) { + return new IntegerDeserializer(); + } else if (serializer instanceof DoubleSerializer) { + return new DoubleDeserializer(); + } else if (serializer instanceof BytesSerializer) { + return new BytesDeserializer(); + } else if (serializer instanceof ByteBufferSerializer) { + return new ByteBufferDeserializer(); + } else if (serializer instanceof ByteArraySerializer) { + return new ByteArrayDeserializer(); + } else { + throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer."); + } + } +} diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java index 37ff221032eb0..2c4af1e047102 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java @@ -19,12 +19,18 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; +import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; @@ -35,13 +41,18 @@ import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.anyVararg; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -50,7 +61,7 @@ import static org.mockito.Mockito.when; @PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class}) -@PowerMockIgnore({"org.apache.logging.log4j.*"}) +@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"}) public class PulsarKafkaProducerTest { @ObjectFactory @@ -92,12 +103,60 @@ public Object answer(InvocationOnMock invocation) throws Throwable { properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000"); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000"); - PulsarKafkaProducer pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null); + new PulsarKafkaProducer<>(properties, null, null); verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS); verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS); } + @Test + public void testPulsarKafkaInterceptor() throws PulsarClientException { + // Arrange + PulsarClient mockClient = mock(PulsarClient.class); + ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); + org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class); + ClientBuilder mockClientBuilder = mock(ClientBuilder.class); + CompletableFuture mockPartitionFuture = new CompletableFuture(); + CompletableFuture mockSendAsyncFuture = new CompletableFuture(); + TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class); + + mockPartitionFuture.complete(new ArrayList<>()); + mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1)); + doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString()); + doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class)); + doReturn(mockClient).when(mockClientBuilder).build(); + doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString()); + doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString()); + doReturn(mockProducerBuilder).when(mockProducerBuilder).clone(); + doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(anyVararg()); + doReturn(mockProducer).when(mockProducerBuilder).create(); + doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage(); + doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync(); + PowerMockito.mockStatic(PulsarClientKafkaConfig.class); + PowerMockito.mockStatic(PulsarProducerKafkaConfig.class); + when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder); + when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder); + + Properties properties = new Properties(); + List interceptors = new ArrayList(); + interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650")); + properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000"); + properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000"); + properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); + + // Act + PulsarKafkaProducer pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null); + + pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value")); + + // Verify + verify(mockProducerBuilder, times(1)).intercept(anyVararg()); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.") public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() { Properties properties = new Properties(); @@ -110,4 +169,27 @@ public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() { new PulsarKafkaProducer<>(properties, null, null); } + public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor { + + @Override + public ProducerRecord onSend(ProducerRecord record) { + return null; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + } + } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java new file mode 100644 index 0000000000000..8da6339e8a5bc --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.kafka.compat; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.client.api.ProducerInterceptor; +import org.apache.pulsar.client.impl.ProducerInterceptors; +import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.apache.pulsar.client.impl.schema.BytesSchema; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Random; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class KafkaProducerInterceptorWrapperTest { + + /** + * This test case is to make sure information is not lost during process of convert Pulsar message to Kafka record + * and back to Pulsar message. + */ + @Test + public void testProducerInterceptorConvertRecordCorrectly() { + + String topic = "topic name"; + int partitionID = 666; + long timeStamp = Math.abs(new Random().nextLong()); + + org.apache.kafka.clients.producer.ProducerInterceptor mockInterceptor1 = + (org.apache.kafka.clients.producer.ProducerInterceptor) mock(org.apache.kafka.clients.producer.ProducerInterceptor.class); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; + Assert.assertEquals(record.key(), "original key"); + Assert.assertEquals(record.value(), "original value".getBytes()); + Assert.assertEquals(record.timestamp().longValue(), timeStamp); + Assert.assertEquals(record.partition().intValue(), partitionID); + return new ProducerRecord(topic, "processed key", "processed value".getBytes()); + } + }).when(mockInterceptor1).onSend(any(ProducerRecord.class)); + + org.apache.kafka.clients.producer.ProducerInterceptor mockInterceptor2 = + (org.apache.kafka.clients.producer.ProducerInterceptor) mock(org.apache.kafka.clients.producer.ProducerInterceptor.class); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; + Assert.assertEquals(record.key(), "processed key"); + Assert.assertEquals(record.value(), "processed value".getBytes()); + Assert.assertEquals(record.timestamp().longValue(), timeStamp); + Assert.assertEquals(record.partition().intValue(), partitionID); + return record; + } + }).when(mockInterceptor2).onSend(any(ProducerRecord.class)); + + ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{ + new KafkaProducerInterceptorWrapper(mockInterceptor1, new StringSerializer(), new ByteArraySerializer(), topic), + new KafkaProducerInterceptorWrapper(mockInterceptor2, new StringSerializer(), new ByteArraySerializer(), topic)})); + + TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema()); + typedMessageBuilder.key("original key"); + typedMessageBuilder.value("original value".getBytes()); + typedMessageBuilder.eventTime(timeStamp); + typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, String.valueOf(partitionID)); + typedMessageBuilder.getMessage(); + + producerInterceptors.beforeSend(null, typedMessageBuilder.getMessage()); + + verify(mockInterceptor1, times(1)).onSend(any(ProducerRecord.class)); + verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class)); + } + +} diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md index 924192d8e7771..cb97fe2e8f416 100644 --- a/site2/docs/adaptors-kafka.md +++ b/site2/docs/adaptors-kafka.md @@ -137,7 +137,7 @@ Properties: | `client.id` | Ignored | | | `compression.type` | Yes | Allows `gzip` and `lz4`. No `snappy`. | | `connections.max.idle.ms` | Yes | Only support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time| -| `interceptor.classes` | Ignored | | +| `interceptor.classes` | Yes | | | `key.serializer` | Yes | | | `linger.ms` | Yes | Controls the group commit time when batching messages | | `max.block.ms` | Ignored | |