Skip to content

Commit

Permalink
Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max …
Browse files Browse the repository at this point in the history
…number of message will return in a single poll. (apache#3887)

Also update doc to reflect that we already supporting earlist and latest strategy for ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.
  • Loading branch information
MarvinCai authored and merlimat committed Mar 26, 2019
1 parent 52659ae commit b42c1e5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene

private volatile boolean closed = false;

private final int maxRecordsInSinglePoll;

private final Properties properties;

private static class QueueItem {
Expand Down Expand Up @@ -153,6 +155,13 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ

String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);

// If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
if(config.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
} else {
maxRecordsInSinglePoll = 1000;
}

this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
Expand Down Expand Up @@ -304,8 +313,6 @@ public void unsubscribe() {
});
}

private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;

@SuppressWarnings("unchecked")
@Override
public ConsumerRecords<K, V> poll(long timeoutMillis) {
Expand Down Expand Up @@ -354,7 +361,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
lastReceivedOffset.put(tp, offset);
unpolledPartitions.remove(tp);

if (++numberOfRecords >= MAX_RECORDS_IN_SINGLE_POLL) {
if (++numberOfRecords >= maxRecordsInSinglePoll) {
break;
}

Expand Down
4 changes: 2 additions & 2 deletions site2/docs/adaptors-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ Properties:
| Config property | Supported | Notes |
|:--------------------------------|:----------|:------------------------------------------------------|
| `group.id` | Yes | Maps to a Pulsar subscription name |
| `max.poll.records` | Ignored | |
| `max.poll.records` | Yes | |
| `max.poll.interval.ms` | Ignored | Messages are "pushed" from broker |
| `session.timeout.ms` | Ignored | |
| `heartbeat.interval.ms` | Ignored | |
| `bootstrap.servers` | Yes | Needs to point to a single Pulsar service URL |
| `enable.auto.commit` | Yes | |
| `auto.commit.interval.ms` | Ignored | With auto-commit, acks are sent immediately to broker |
| `partition.assignment.strategy` | Ignored | |
| `auto.offset.reset` | Ignored | |
| `auto.offset.reset` | Yes | Only support earliest and latest. |
| `fetch.min.bytes` | Ignored | |
| `fetch.max.bytes` | Ignored | |
| `fetch.max.wait.ms` | Ignored | |
Expand Down

0 comments on commit b42c1e5

Please sign in to comment.