Skip to content

Commit

Permalink
MINOR: add docs to remind reader that impl of ConsumerPartitionAssign… (
Browse files Browse the repository at this point in the history
apache#13659)

Reviewers: David Jacot <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
chia7712 authored May 5, 2023
1 parent 6bcc497 commit 6e7144a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
* as the group coordinator. The coordinator selects one member to perform the group assignment and
* propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called
* to perform the assignment and the results are forwarded back to each respective members
*
* <p>
* In some cases, it is useful to forward additional metadata to the assignor in order to make
* assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom
* userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
* can use this user data to forward the rackId belonging to each member.
* <p>
* The implementation can extend {@link Configurable} to get configs from consumer.
*/
public interface ConsumerPartitionAssignor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@


import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -108,7 +112,21 @@ public void shouldThrowKafkaExceptionOnAssignorsWithSameName() {
));
}

public static class TestConsumerPartitionAssignor implements ConsumerPartitionAssignor {
@Test
public void shouldBeConfigurable() {
Map<String, Object> configs = Collections.singletonMap("key", "value");
List<ConsumerPartitionAssignor> assignors = getAssignorInstances(
Collections.singletonList(TestConsumerPartitionAssignor.class.getName()),
configs
);
assertEquals(1, assignors.size());
assertInstanceOf(TestConsumerPartitionAssignor.class, assignors.get(0));
assertEquals(configs, ((TestConsumerPartitionAssignor) assignors.get(0)).configs);
}


public static class TestConsumerPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
private Map<String, ?> configs = null;

@Override
public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
Expand All @@ -120,6 +138,11 @@ public String name() {
// use the RangeAssignor's name to cause naming conflict
return new RangeAssignor().name();
}

@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
}
}

private ConsumerConfig initConsumerConfigWithClassTypes(List<Object> classTypes) {
Expand Down

0 comments on commit 6e7144a

Please sign in to comment.