Skip to content

Commit

Permalink
Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKa…
Browse files Browse the repository at this point in the history
…fkaConsumer. (apache#3911)

* Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer.
Apply onConsume in poll() before returning the ConsumerRecords,
apply onCommit in doCommitOffsets() before committing all offsets.

Also apply doc to reflect support for ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG.

* Update error message for applying onConsume and onCommit for interceptors to include the specific interceptor name.
  • Loading branch information
MarvinCai authored and merlimat committed Mar 28, 2019
1 parent ee395ca commit 0487804
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
private final SubscriptionInitialPosition strategy;

private List<ConsumerInterceptor<K, V>> interceptors;

private volatile boolean closed = false;

private final int maxRecordsInSinglePoll;
Expand Down Expand Up @@ -162,6 +164,9 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ
maxRecordsInSinglePoll = 1000;
}

interceptors = (List) config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);

this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
Expand Down Expand Up @@ -374,7 +379,8 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
commitAsync();
}

return new ConsumerRecords<>(records);
// If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -438,6 +444,7 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
List<CompletableFuture<Void>> futures = new ArrayList<>();

applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
Expand All @@ -457,6 +464,43 @@ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
return offsets;
}

/**
* Apply all onConsume methods in a list of ConsumerInterceptors.
* Catch any exception during the process.
*
* @param interceptors Interceptors provided.
* @param consumerRecords ConsumerRecords returned by calling {@link this#poll(long)}.
* @return ConsumerRecords after applying all ConsumerInterceptor in interceptors list.
*/
private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
ConsumerRecords processedConsumerRecords = consumerRecords;
for (ConsumerInterceptor interceptor : interceptors) {
try {
processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
} catch (Exception e) {
log.warn("Error executing onConsume for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
}
}
return processedConsumerRecords;
}

/**
* Apply all onCommit methods in a list of ConsumerInterceptors.
* Catch any exception during the process.
*
* @param interceptors Interceptors provided.
* @param offsets Offsets need to be commit.
*/
private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
for (ConsumerInterceptor interceptor : interceptors) {
try {
interceptor.onCommit(offsets);
} catch (Exception e) {
log.warn("Error executing onCommit for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
}
}
}

@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
Expand Down
1 change: 1 addition & 0 deletions site2/docs/adaptors-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ Properties:
| `fetch.min.bytes` | Ignored | |
| `fetch.max.bytes` | Ignored | |
| `fetch.max.wait.ms` | Ignored | |
| `interceptor.classes` | Yes | |
| `metadata.max.age.ms` | Ignored | |
| `max.partition.fetch.bytes` | Ignored | |
| `send.buffer.bytes` | Ignored | |
Expand Down

0 comments on commit 0487804

Please sign in to comment.