Skip to content

Commit

Permalink
Fix kafka_0_9 Consumer partition topic throw error (apache#7179)
Browse files Browse the repository at this point in the history
Master Issue: apache#7178
## Motivation

When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,  
because 
```
org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
```
## Modifications
BrokerService
```java
 public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
        List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();

        List<TopicPartition> topicPartitions = new ArrayList<>();
        try {
            for (String topic : topics) {
                // Create individual subscription on each partition, that way we can keep using the
                // acknowledgeCumulative()
                int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener(this);
                consumerBuilder.subscriptionName(groupId);
                //consumerBuilder.topics(topics);
                if (numberOfPartitions > 1) {
                    // Subscribe to each partition
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    for (int i = 0; i < numberOfPartitions; i++) {
                        String partitionName = TopicName.get(topic).getPartition(i).toString();
                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                .topic(partitionName).subscribeAsync();
                        int partitionIndex = i;
                        TopicPartition tp = new TopicPartition(
                                TopicName.get(topic).getPartitionedTopicName(),
                                partitionIndex);
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, tp);
                            consumers.putIfAbsent(tp, consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
```
We should remove consumerBuilder.topics(topics)。
  • Loading branch information
liudezhi2098 authored Jun 8, 2020
1 parent 4beebb3 commit d8ab1a2
Showing 1 changed file with 0 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
consumerBuilder.topics(topics);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());
Expand Down

0 comments on commit d8ab1a2

Please sign in to comment.