Skip to content

Commit

Permalink
[hotfix] [kafka, tests] Commit read offsets in Kafka integration tests
Browse files Browse the repository at this point in the history
Previously offsets were not commited so the same records could be read more then once.
It was not a big issue, because so far this methods were used only for at-least-once tests.

This closes apache#4310.
  • Loading branch information
pnowojski authored and tzulitai committed Jul 24, 2017
1 parent 58b5374 commit c65afdb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,26 @@ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeser
@Override
public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List<ConsumerRecord<K, V>> result = new ArrayList<>();
KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));

while (true) {
boolean processedAtLeastOneRecord = false;

// wait for new records with timeout and break the loop if we didn't get any
Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
result.add(record);
processedAtLeastOneRecord = true;
}

if (!processedAtLeastOneRecord) {
break;
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));

while (true) {
boolean processedAtLeastOneRecord = false;

// wait for new records with timeout and break the loop if we didn't get any
Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
result.add(record);
processedAtLeastOneRecord = true;
}

if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}

return UnmodifiableList.decorate(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,21 @@ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeser
@Override
public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List<ConsumerRecord<K, V>> result = new ArrayList<>();
KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(new TopicPartition(topic, partition));
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(new TopicPartition(topic, partition));

while (true) {
Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
if (topics == null || !topics.containsKey(topic)) {
break;
}
List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
result.addAll(records);
if (records.size() == 0) {
break;
while (true) {
Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
if (topics == null || !topics.containsKey(topic)) {
break;
}
List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
result.addAll(records);
if (records.size() == 0) {
break;
}
}
consumer.commit(true);
}

return UnmodifiableList.decorate(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,24 @@ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeser
@Override
public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List<ConsumerRecord<K, V>> result = new ArrayList<>();
KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));

while (true) {
boolean processedAtLeastOneRecord = false;
while (true) {
boolean processedAtLeastOneRecord = false;

Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
result.add(record);
processedAtLeastOneRecord = true;
}
Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord<K, V> record = iterator.next();
result.add(record);
processedAtLeastOneRecord = true;
}

if (!processedAtLeastOneRecord) {
break;
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}

return UnmodifiableList.decorate(result);
Expand Down

0 comments on commit c65afdb

Please sign in to comment.